Changeset - bf3546d1cd77
[Not reviewed]
default
0 5 0
Søren Løvborg - 9 years ago 2016-11-09 14:17:34
sorenl@unity3d.com
db: clean up SQLAlchemy session flushes

Many calls to Session().flush() were completely superfluous and have
been removed. (See also the note on "flush" in the contributor docs.)
For the remaining calls, a comment has been added to explain why it's
necessary.
5 files changed with 15 insertions and 19 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
@@ -99,337 +99,336 @@ class KallitheaAuthPluginBase(object):
 

	
 
        This property will trigger an automatic call to authenticate on
 
        a visit to the website or during a push/pull.
 

	
 
        :returns: bool
 
        """
 
        return False
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Checks if this authentication module should accept a request for
 
        the current user.
 

	
 
        :param user: user object fetched using plugin's get_user() method.
 
        :param accepts_empty: if True accepts don't allow the user to be empty
 
        :returns: boolean
 
        """
 
        plugin_name = self.name
 
        if not user and not accepts_empty:
 
            log.debug('User is empty not allowed to authenticate')
 
            return False
 

	
 
        if user and user.extern_type and user.extern_type != plugin_name:
 
            log.debug('User %s should authenticate using %s this is %s, skipping',
 
                      user, user.extern_type, plugin_name)
 

	
 
            return False
 
        return True
 

	
 
    def get_user(self, username=None, **kwargs):
 
        """
 
        Helper method for user fetching in plugins, by default it's using
 
        simple fetch by username, but this method can be customized in plugins
 
        eg. container auth plugin to fetch user by environ params
 

	
 
        :param username: username if given to fetch from database
 
        :param kwargs: extra arguments needed for user fetching.
 
        """
 
        user = None
 
        log.debug('Trying to fetch user `%s` from Kallithea database',
 
                  username)
 
        if username:
 
            user = User.get_by_username_or_email(username)
 
            if user is None:
 
                log.debug('Fallback to fetch user in case insensitive mode')
 
                user = User.get_by_username(username, case_insensitive=True)
 
        else:
 
            log.debug('provided username:`%s` is empty skipping...', username)
 
        return user
 

	
 
    def settings(self):
 
        """
 
        Return a list of the form:
 
        [
 
            {
 
                "name": "OPTION_NAME",
 
                "type": "[bool|password|string|int|select]",
 
                ["values": ["opt1", "opt2", ...]]
 
                "validator": "expr"
 
                "description": "A short description of the option" [,
 
                "default": Default Value],
 
                ["formname": "Friendly Name for Forms"]
 
            } [, ...]
 
        ]
 

	
 
        This is used to interrogate the authentication plugin as to what
 
        settings it expects to be present and configured.
 

	
 
        'type' is a shorthand notation for what kind of value this option is.
 
        This is primarily used by the auth web form to control how the option
 
        is configured.
 
                bool : checkbox
 
                password : password input box
 
                string : input box
 
                select : single select dropdown
 

	
 
        'validator' is an lazy instantiated form field validator object, ala
 
        formencode. You need to *call* this object to init the validators.
 
        All calls to Kallithea validators should be used through self.validators
 
        which is a lazy loading proxy of formencode module.
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def plugin_settings(self):
 
        """
 
        This method is called by the authentication framework, not the .settings()
 
        method. This method adds a few default settings (e.g., "enabled"), so that
 
        plugin authors don't have to maintain a bunch of boilerplate.
 

	
 
        OVERRIDING THIS METHOD WILL CAUSE YOUR PLUGIN TO FAIL.
 
        """
 

	
 
        rcsettings = self.settings()
 
        rcsettings.insert(0, {
 
            "name": "enabled",
 
            "validator": self.validators.StringBoolean(if_missing=False),
 
            "type": "bool",
 
            "description": "Enable or Disable this Authentication Plugin",
 
            "formname": "Enabled"
 
            }
 
        )
 
        return rcsettings
 

	
 
    def user_activation_state(self):
 
        """
 
        Defines user activation state when creating new users
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def auth(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Given a user object (which may be None), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary with keys from
 
        KallitheaAuthPluginBase.auth_func_attrs.
 

	
 
        This is later validated for correctness.
 
        """
 
        raise NotImplementedError("not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Wrapper to call self.auth() that validates call on it
 

	
 
        :param userobj: userobj
 
        :param username: username
 
        :param passwd: plaintext password
 
        :param settings: plugin settings
 
        """
 
        user_data = self.auth(userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            return self._validate_auth_return(user_data)
 
        return None
 

	
 
    def _validate_auth_return(self, user_data):
 
        if not isinstance(user_data, dict):
 
            raise Exception('returned value from auth must be a dict')
 
        for k in self.auth_func_attrs:
 
            if k not in user_data:
 
                raise Exception('Missing %s attribute from returned data' % k)
 
        return user_data
 

	
 

	
 
class KallitheaExternalAuthPlugin(KallitheaAuthPluginBase):
 
    def use_fake_password(self):
 
        """
 
        Return a boolean that indicates whether or not we should set the user's
 
        password to a random value when it is authenticated by this plugin.
 
        If your plugin provides authentication, then you will generally want this.
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        user_data = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
            userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            # maybe plugin will clean the username ?
 
            # we should use the return value
 
            username = user_data['username']
 
            # if user is not active from our extern type we should fail to auth
 
            # this can prevent from creating users in Kallithea when using
 
            # external authentication, but if it's inactive user we shouldn't
 
            # create that user anyway
 
            if user_data['active_from_extern'] is False:
 
                log.warning("User %s authenticated against %s, but is inactive",
 
                            username, self.__module__)
 
                return None
 

	
 
            if self.use_fake_password():
 
                # Randomize the PW because we don't need it, but don't want
 
                # them blank either
 
                passwd = PasswordGenerator().gen_password(length=8)
 

	
 
            log.debug('Updating or creating user info from %s plugin',
 
                      self.name)
 
            user = UserModel().create_or_update(
 
                username=username,
 
                password=passwd,
 
                email=user_data["email"],
 
                firstname=user_data["firstname"],
 
                lastname=user_data["lastname"],
 
                active=user_data["active"],
 
                admin=user_data["admin"],
 
                extern_name=user_data["extern_name"],
 
                extern_type=self.name
 
            )
 
            Session().flush()
 
            # enforce user is just in given groups, all of them has to be ones
 
            # created from plugins. We store this info in _group_data JSON field
 
            groups = user_data['groups'] or []
 
            UserGroupModel().enforce_groups(user, groups, self.name)
 
            Session().commit()
 
        return user_data
 

	
 

	
 
def importplugin(plugin):
 
    """
 
    Imports and returns the authentication plugin in the module named by plugin
 
    (e.g., plugin='kallithea.lib.auth_modules.auth_internal'). Returns the
 
    KallitheaAuthPluginBase subclass on success, raises exceptions on failure.
 

	
 
    raises:
 
        AttributeError -- no KallitheaAuthPlugin class in the module
 
        TypeError -- if the KallitheaAuthPlugin is not a subclass of ours KallitheaAuthPluginBase
 
        ImportError -- if we couldn't import the plugin at all
 
    """
 
    log.debug("Importing %s", plugin)
 
    if not plugin.startswith(u'kallithea.lib.auth_modules.auth_'):
 
        parts = plugin.split(u'.lib.auth_modules.auth_', 1)
 
        if len(parts) == 2:
 
            _module, pn = parts
 
            plugin = u'kallithea.lib.auth_modules.auth_' + pn
 
    PLUGIN_CLASS_NAME = "KallitheaAuthPlugin"
 
    try:
 
        module = importlib.import_module(plugin)
 
    except (ImportError, TypeError):
 
        log.error(traceback.format_exc())
 
        # TODO: make this more error prone, if by some accident we screw up
 
        # the plugin name, the crash is pretty bad and hard to recover
 
        raise
 

	
 
    log.debug("Loaded auth plugin from %s (module:%s, file:%s)",
 
              plugin, module.__name__, module.__file__)
 

	
 
    pluginclass = getattr(module, PLUGIN_CLASS_NAME)
 
    if not issubclass(pluginclass, KallitheaAuthPluginBase):
 
        raise TypeError("Authentication class %s.KallitheaAuthPlugin is not "
 
                        "a subclass of %s" % (plugin, KallitheaAuthPluginBase))
 
    return pluginclass
 

	
 

	
 
def loadplugin(plugin):
 
    """
 
    Loads and returns an instantiated authentication plugin.
 

	
 
        see: importplugin
 
    """
 
    plugin = importplugin(plugin)()
 
    if plugin.plugin_settings.im_func != KallitheaAuthPluginBase.plugin_settings.im_func:
 
        raise TypeError("Authentication class %s.KallitheaAuthPluginBase "
 
                        "has overridden the plugin_settings method, which is "
 
                        "forbidden." % plugin)
 
    return plugin
 

	
 

	
 
def authenticate(username, password, environ=None):
 
    """
 
    Authentication function used for access control,
 
    It tries to authenticate based on enabled authentication modules.
 

	
 
    :param username: username can be empty for container auth
 
    :param password: password can be empty for container auth
 
    :param environ: environ headers passed for container auth
 
    :returns: None if auth failed, user_data dict if auth is correct
 
    """
 

	
 
    auth_plugins = Setting.get_auth_plugins()
 
    log.debug('Authentication against %s plugins', auth_plugins)
 
    for module in auth_plugins:
 
        try:
 
            plugin = loadplugin(module)
 
        except (ImportError, AttributeError, TypeError) as e:
 
            raise ImportError('Failed to load authentication module %s : %s'
 
                              % (module, str(e)))
 
        log.debug('Trying authentication using ** %s **', module)
 
        # load plugin settings from Kallithea database
 
        plugin_name = plugin.name
 
        plugin_settings = {}
 
        for v in plugin.plugin_settings():
 
            conf_key = "auth_%s_%s" % (plugin_name, v["name"])
 
            setting = Setting.get_by_name(conf_key)
 
            plugin_settings[v["name"]] = setting.app_settings_value if setting else None
 
        log.debug('Plugin settings \n%s', formatted_json(plugin_settings))
 

	
 
        if not str2bool(plugin_settings["enabled"]):
 
            log.info("Authentication plugin %s is disabled, skipping for %s",
 
                     module, username)
 
            continue
 

	
 
        # use plugin's method of user extraction.
 
        user = plugin.get_user(username, environ=environ,
 
                               settings=plugin_settings)
 
        log.debug('Plugin %s extracted user is `%s`', module, user)
 
        if not plugin.accepts(user):
 
            log.debug('Plugin %s does not accept user `%s` for authentication',
 
                      module, user)
 
            continue
 
        else:
 
            log.debug('Plugin %s accepted user `%s` for authentication',
 
                      module, user)
 
            # The user might have tried to authenticate using their email address,
 
            # then the username variable wouldn't contain a valid username.
 
            # But as the plugin has accepted the user, .username field should
 
            # have a valid username, so use it for authentication purposes.
 
            if user is not None:
 
                username = user.username
 

	
 
        log.info('Authenticating user using %s plugin', plugin.__module__)
 

	
 
        # _authenticate is a wrapper for .auth() method of plugin.
 
        # it checks if .auth() sends proper data. For KallitheaExternalAuthPlugin
 
        # it also maps users to Database and maps the attributes returned
 
        # from .auth() to Kallithea database. If this function returns data
 
        # then auth is correct.
 
        user_data = plugin._authenticate(user, username, password,
 
                                           plugin_settings,
 
                                           environ=environ or {})
 
        log.debug('PLUGIN USER DATA: %s', user_data)
 

	
 
        if user_data is not None:
 
            log.debug('Plugin returned proper authentication data')
 
            return user_data
 

	
 
        # we failed to Auth because .auth() method didn't return the user
 
        if username:
 
            log.warning("User `%s` failed to authenticate against %s",
 
                        username, plugin.__module__)
 
    return None
 

	
 
def get_managed_fields(user):
 
    """return list of fields that are managed by the user's auth source, usually some of
 
    'username', 'firstname', 'lastname', 'email', 'active', 'password'
 
    """
 
    auth_plugins = Setting.get_auth_plugins()
 
    for module in auth_plugins:
 
        log.debug('testing %s (%s) with auth plugin %s', user, user.extern_type, module)
 
        plugin = loadplugin(module)
 
        if plugin.name == user.extern_type:
 
            return plugin.get_managed_fields()
 
    log.error('no auth plugin %s found for %s', user.extern_type, user)
 
    return [] # TODO: Fail badly instead of allowing everything to be edited?
kallithea/model/gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.gist
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
gist model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import time
 
import logging
 
import traceback
 
import shutil
 

	
 
from kallithea.lib.utils2 import safe_unicode, unique_id, safe_int, \
 
    time_to_datetime, AttributeDict
 
from kallithea.lib.compat import json
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import Gist
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
class GistModel(BaseModel):
 

	
 
    def _get_gist(self, gist):
 
        """
 
        Helper method to get gist by ID, or gist_access_id as a fallback
 

	
 
        :param gist: GistID, gist_access_id, or Gist instance
 
        """
 
        return Gist.guess_instance(gist, callback=Gist.get_by_access_id)
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s", rm_path)
 
        shutil.rmtree(rm_path)
 

	
 
    def _store_metadata(self, repo, gist_id, gist_access_id, user_id, gist_type,
 
                        gist_expires):
 
        """
 
        store metadata inside the gist, this can be later used for imports
 
        or gist identification
 
        """
 
        metadata = {
 
            'metadata_version': '1',
 
            'gist_db_id': gist_id,
 
            'gist_access_id': gist_access_id,
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(json.dumps(metadata))
 

	
 
    def get_gist(self, gist):
 
        return self._get_gist(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset(revision)
 
        return cs, [n for n in cs.get_node('/')]
 

	
 
    def create(self, description, owner, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        owner = self._get_user(owner)
 
        gist_id = safe_unicode(unique_id(20))
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s',
 
                  time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever')
 
        #create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_id
 
        gist.gist_owner = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = safe_unicode(gist_type)
 
        self.sa.add(gist)
 
        self.sa.flush()
 
        self.sa.flush() # make database assign gist.gist_id
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist_id = safe_unicode(gist.gist_id)
 
            gist.gist_access_id = gist_id
 
            self.sa.add(gist)
 

	
 
        gist_repo_path = os.path.join(GIST_STORE_LOC, gist_id)
 
        log.debug('Creating new %s GIST repo in %s', gist_type, gist_repo_path)
 
        repo = RepoModel()._create_filesystem_repo(
 
            repo_name=gist_id, repo_type='hg', repo_group=GIST_STORE_LOC)
 

	
 
        processed_mapping = {}
 
        for filename in gist_mapping:
 
            if filename != os.path.basename(filename):
 
                raise Exception('Filename cannot be inside a directory')
 

	
 
            content = gist_mapping[filename]['content']
 
            #TODO: expand support for setting explicit lexers
 
#             if lexer is None:
 
#                 try:
 
#                     guess_lexer = pygments.lexers.guess_lexer_for_filename
 
#                     lexer = guess_lexer(filename,content)
 
#                 except pygments.util.ClassNotFound:
 
#                     lexer = 'text'
 
            processed_mapping[filename] = {'content': content}
 

	
 
        # now create single multifile commit
 
        message = 'added file'
 
        message += 's: ' if len(processed_mapping) > 1 else ': '
 
        message += ', '.join([x for x in processed_mapping])
 

	
 
        #fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=gist_repo_path,
 
            scm_instance_no_cache=lambda: repo,
 
        ))
 
        ScmModel().create_nodes(
 
            user=owner.user_id, repo=fake_repo,
 
            message=message,
 
            nodes=processed_mapping,
 
            trigger_push_hook=False
 
        )
 

	
 
        self._store_metadata(repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 
        return gist
 

	
 
    def delete(self, gist, fs_remove=True):
 
        gist = self._get_gist(gist)
 
        try:
 
            self.sa.delete(gist)
 
            if fs_remove:
 
                self.__delete_gist(gist)
 
            else:
 
                log.debug('skipping removal from filesystem')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, gist, description, owner, gist_mapping, gist_type,
 
               lifetime):
 
        gist = self._get_gist(gist)
 
        gist_repo = gist.scm_instance
 

	
 
        lifetime = safe_int(lifetime, -1)
 
        if lifetime == 0:  # preserve old value
 
            gist_expires = gist.gist_expires
 
        else:
 
            gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 

	
 
        #calculate operation type based on given data
 
        gist_mapping_op = {}
 
        for k, v in gist_mapping.items():
 
            # add, mod, del
 
            if not v['org_filename'] and v['filename']:
 
                op = 'add'
 
            elif v['org_filename'] and not v['filename']:
 
                op = 'del'
 
            else:
 
                op = 'mod'
 

	
 
            v['op'] = op
 
            gist_mapping_op[k] = v
 

	
 
        gist.gist_description = description
 
        gist.gist_expires = gist_expires
 
        gist.owner = owner
 
        gist.gist_type = gist_type
 
        self.sa.add(gist)
 
        self.sa.flush()
 

	
 
        message = 'updated file'
 
        message += 's: ' if len(gist_mapping) > 1 else ': '
 
        message += ', '.join([x for x in gist_mapping])
 

	
 
        #fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=gist_repo.path,
 
            scm_instance_no_cache=lambda: gist_repo,
 
        ))
 

	
 
        self._store_metadata(gist_repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 

	
 
        ScmModel().update_nodes(
 
            user=owner.user_id,
 
            repo=fake_repo,
 
            message=message,
 
            nodes=gist_mapping_op,
 
            trigger_push_hook=False
 
        )
 

	
 
        return gist
kallithea/model/user.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.user
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
users model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import hashlib
 
import hmac
 
import logging
 
import time
 
import traceback
 

	
 
from pylons import config
 
from pylons.i18n.translation import _
 

	
 
from sqlalchemy.exc import DatabaseError
 

	
 
from kallithea.lib.utils2 import safe_str, generate_api_key, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import User, UserToPerm, Notification, \
 
    UserEmailMap, UserIpMap
 
from kallithea.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserModel(BaseModel):
 
    password_reset_token_lifetime = 86400 # 24 hours
 

	
 
    def get(self, user_id, cache=False):
 
        user = self.sa.query(User)
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % user_id))
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return self._get_user(user)
 

	
 
    def create(self, form_data, cur_user=None):
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        _fd = form_data
 
        user_data = {
 
            'username': _fd['username'],
 
            'password': _fd['password'],
 
            'email': _fd['email'],
 
            'firstname': _fd['firstname'],
 
            'lastname': _fd['lastname'],
 
            'active': _fd['active'],
 
            'admin': False
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        new_user = User()
 
        for k, v in form_data.items():
 
            if k == 'password':
 
                v = get_crypt_password(v)
 
            if k == 'firstname':
 
                k = 'name'
 
            setattr(new_user, k, v)
 

	
 
        new_user.api_key = generate_api_key()
 
        self.sa.add(new_user)
 
        Session().add(new_user)
 
        Session().flush() # make database assign new_user.user_id
 

	
 
        log_create_user(new_user.get_dict(), cur_user)
 
        return new_user
 

	
 
    def create_or_update(self, username, password, email, firstname=u'',
 
                         lastname=u'', active=True, admin=False,
 
                         extern_type=None, extern_name=None, cur_user=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param active:
 
        :param firstname:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param extern_name:
 
        :param extern_type:
 
        :param cur_user:
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.auth import get_crypt_password, check_password
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        user_data = {
 
            'username': username, 'password': password,
 
            'email': email, 'firstname': firstname, 'lastname': lastname,
 
            'active': active, 'admin': admin
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 

	
 
        log.debug('Checking for %s account in Kallithea database', username)
 
        user = User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s', username)
 
            new_user = User()
 
            edit = False
 
        else:
 
            log.debug('updating user %s', username)
 
            new_user = user
 
            edit = True
 

	
 
        try:
 
            new_user.username = username
 
            new_user.admin = admin
 
            new_user.email = email
 
            new_user.active = active
 
            new_user.extern_name = safe_str(extern_name) \
 
                if extern_name else None
 
            new_user.extern_type = safe_str(extern_type) \
 
                if extern_type else None
 
            new_user.name = firstname
 
            new_user.lastname = lastname
 

	
 
            if not edit:
 
                new_user.api_key = generate_api_key()
 

	
 
            # set password only if creating an user or password is changed
 
            password_change = new_user.password and \
 
                not check_password(password, new_user.password)
 
            if not edit or password_change:
 
                reason = 'new password' if edit else 'new user'
 
                log.debug('Updating password reason=>%s', reason)
 
                new_user.password = get_crypt_password(password) \
 
                    if password else ''
 

	
 
            self.sa.add(new_user)
 
            if user is None:
 
                Session().add(new_user)
 
                Session().flush() # make database assign new_user.user_id
 

	
 
            if not edit:
 
                log_create_user(new_user.get_dict(), cur_user)
 

	
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_registration(self, form_data):
 
        from kallithea.model.notification import NotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        form_data['admin'] = False
 
        form_data['extern_type'] = User.DEFAULT_AUTH_TYPE
 
        form_data['extern_name'] = ''
 
        new_user = self.create(form_data)
 

	
 
        self.sa.add(new_user)
 
        self.sa.flush()
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            u'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = h.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
 
            'new_username': new_user.username,
 
            'new_email': new_user.email,
 
            'new_full_name': new_user.full_name}
 
        NotificationModel().create(created_by=new_user, subject=subject,
 
                                   body=body, recipients=None,
 
                                   type_=Notification.TYPE_REGISTRATION,
 
                                   email_kwargs=email_kwargs)
 

	
 
    def update(self, user_id, form_data, skip_attrs=None):
 
        from kallithea.lib.auth import get_crypt_password
 
        skip_attrs = skip_attrs or []
 
        user = self.get(user_id, cache=False)
 
        if user.username == User.DEFAULT_USER:
 
            raise DefaultUserException(
 
                            _("You can't edit this user since it's "
 
                              "crucial for entire application"))
 

	
 
        for k, v in form_data.items():
 
            if k in skip_attrs:
 
                continue
 
            if k == 'new_password' and v:
 
                user.password = get_crypt_password(v)
 
            else:
 
                # old legacy thing orm models store firstname as name,
 
                # need proper refactor to username
 
                if k == 'firstname':
 
                    k = 'name'
 
                setattr(user, k, v)
 
        self.sa.add(user)
 

	
 
    def update_user(self, user, **kwargs):
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        user = self._get_user(user)
 
        if user.username == User.DEFAULT_USER:
 
            raise DefaultUserException(
 
                _("You can't edit this user since it's"
 
                  " crucial for entire application")
 
            )
 

	
 
        for k, v in kwargs.items():
 
            if k == 'password' and v:
 
                v = get_crypt_password(v)
 

	
 
            setattr(user, k, v)
 
        self.sa.add(user)
 
        return user
 

	
 
    def delete(self, user, cur_user=None):
 
        if cur_user is None:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        user = self._get_user(user)
 

	
 
        if user.username == User.DEFAULT_USER:
 
            raise DefaultUserException(
 
                _("You can't remove this user since it is"
 
                  " crucial for the entire application"))
 
        if user.repositories:
 
            repos = [x.repo_name for x in user.repositories]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s repositories and cannot be '
 
                  'removed. Switch owners or remove those repositories: %s')
 
                % (user.username, len(repos), ', '.join(repos)))
 
        if user.repo_groups:
 
            repogroups = [x.group_name for x in user.repo_groups]
 
            raise UserOwnsReposException(_(
 
                'User "%s" still owns %s repository groups and cannot be '
 
                'removed. Switch owners or remove those repository groups: %s')
 
                % (user.username, len(repogroups), ', '.join(repogroups)))
 
        if user.user_groups:
 
            usergroups = [x.users_group_name for x in user.user_groups]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s user groups and cannot be '
 
                  'removed. Switch owners or remove those user groups: %s')
 
                % (user.username, len(usergroups), ', '.join(usergroups)))
 
        self.sa.delete(user)
 

	
 
        from kallithea.lib.hooks import log_delete_user
 
        log_delete_user(user.get_dict(), cur_user)
 

	
 
    def can_change_password(self, user):
 
        from kallithea.lib import auth_modules
 
        managed_fields = auth_modules.get_managed_fields(user)
 
        return 'password' not in managed_fields
 

	
 
    def get_reset_password_token(self, user, timestamp, session_id):
 
        """
 
        The token is a 40-digit hexstring, calculated as a HMAC-SHA1.
 

	
 
        In a traditional HMAC scenario, an attacker is unable to know or
 
        influence the secret key, but can know or influence the message
 
        and token. This scenario is slightly different (in particular
 
        since the message sender is also the message recipient), but
 
        sufficiently similar to use an HMAC. Benefits compared to a plain
 
        SHA1 hash includes resistance against a length extension attack.
 

	
 
        The HMAC key consists of the following values (known only to the
 
        server and authorized users):
 

	
 
        * per-application secret (the `app_instance_uuid` setting), without
 
          which an attacker cannot counterfeit tokens
 
        * hashed user password, invalidating the token upon password change
 

	
 
        The HMAC message consists of the following values (potentially known
 
        to an attacker):
 

	
 
        * session ID (the anti-CSRF token), requiring an attacker to have
 
          access to the browser session in which the token was created
 
        * numeric user ID, limiting the token to a specific user (yet allowing
 
          users to be renamed)
 
        * user email address
 
        * time of token issue (a Unix timestamp, to enable token expiration)
 

	
 
        The key and message values are separated by NUL characters, which are
 
        guaranteed not to occur in any of the values.
 
        """
 
        app_secret = config.get('app_instance_uuid')
 
        return hmac.HMAC(
 
            key=u'\0'.join([app_secret, user.password]).encode('utf-8'),
 
            msg=u'\0'.join([session_id, str(user.user_id), user.email, str(timestamp)]).encode('utf-8'),
 
            digestmod=hashlib.sha1,
 
        ).hexdigest()
 

	
 
    def send_reset_password_email(self, data):
 
        """
 
        Sends email with a password reset token and link to the password
 
        reset confirmation page with all information (including the token)
 
        pre-filled. Also returns URL of that page, only without the token,
 
        allowing users to copy-paste or manually enter the token from the
 
        email.
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.model.notification import EmailNotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        user_email = data['email']
 
        user = User.get_by_email(user_email)
 
        timestamp = int(time.time())
 
        if user is not None:
 
            if self.can_change_password(user):
 
                log.debug('password reset user %s found', user)
 
                token = self.get_reset_password_token(user,
 
                                                      timestamp,
 
                                                      h.authentication_token())
 
                # URL must be fully qualified; but since the token is locked to
 
                # the current browser session, we must provide a URL with the
 
                # current scheme and hostname, rather than the canonical_url.
 
                link = h.url('reset_password_confirmation', qualified=True,
 
                             email=user_email,
 
                             timestamp=timestamp,
 
                             token=token)
 
            else:
 
                log.debug('password reset user %s found but was managed', user)
 
                token = link = None
 
            reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
 
            body = EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'txt',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            html_body = EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'html',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            log.debug('sending email')
 
            tasks.send_email([user_email], _("Password reset link"), body, html_body)
 
            log.info('send new password mail to %s', user_email)
 
        else:
 
            log.debug("password reset email %s not found", user_email)
 

	
 
        return h.url('reset_password_confirmation',
 
                     email=user_email,
 
                     timestamp=timestamp)
 

	
 
    def verify_reset_password_token(self, email, timestamp, token):
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.lib import auth
 
        import kallithea.lib.helpers as h
 
        user = User.get_by_email(email)
kallithea/model/user_group.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.user_group
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
user group model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 1, 2011
 
:author: nvinot, marcink
 
"""
 

	
 

	
 
import logging
 
import traceback
 

	
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import UserGroupMember, UserGroup, \
 
    UserGroupRepoToPerm, Permission, UserGroupToPerm, User, UserUserGroupToPerm, \
 
    UserGroupUserGroupToPerm
 
from kallithea.lib.exceptions import UserGroupsAssignedException, \
 
    RepoGroupAssignmentError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserGroupModel(BaseModel):
 

	
 
    def _get_user_group(self, user_group):
 
        return UserGroup.guess_instance(user_group,
 
                                  callback=UserGroup.get_by_group_name)
 

	
 
    def _create_default_perms(self, user_group):
 
        # create default permission
 
        default_perm = 'usergroup.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('usergroup.'):
 
                default_perm = p.permission.permission_name
 
                break
 

	
 
        user_group_to_perm = UserUserGroupToPerm()
 
        user_group_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        user_group_to_perm.user_group = user_group
 
        user_group_to_perm.user_id = def_user.user_id
 
        return user_group_to_perm
 

	
 
    def _update_permissions(self, user_group, perms_new=None,
 
                            perms_updates=None):
 
        from kallithea.lib.auth import HasUserGroupPermissionAny
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        # update permissions
 
        for member, perm, member_type in perms_updates:
 
            if member_type == 'user':
 
                # this updates existing one
 
                self.grant_user_permission(
 
                    user_group=user_group, user=member, perm=perm
 
                )
 
            else:
 
                #check if we have permissions to alter this usergroup
 
                if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
 
                                             'usergroup.admin')(member):
 
                    self.grant_user_group_permission(
 
                        target_user_group=user_group, user_group=member, perm=perm
 
                    )
 
        # set new permissions
 
        for member, perm, member_type in perms_new:
 
            if member_type == 'user':
 
                self.grant_user_permission(
 
                    user_group=user_group, user=member, perm=perm
 
                )
 
            else:
 
                #check if we have permissions to alter this usergroup
 
                if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write',
 
                                             'usergroup.admin')(member):
 
                    self.grant_user_group_permission(
 
                        target_user_group=user_group, user_group=member, perm=perm
 
                    )
 

	
 
    def get(self, user_group_id, cache=False):
 
        return UserGroup.get(user_group_id)
 

	
 
    def get_group(self, user_group):
 
        return self._get_user_group(user_group)
 

	
 
    def get_by_name(self, name, cache=False, case_insensitive=False):
 
        return UserGroup.get_by_group_name(name, cache, case_insensitive)
 

	
 
    def create(self, name, description, owner, active=True, group_data=None):
 
        try:
 
            new_user_group = UserGroup()
 
            new_user_group.owner = self._get_user(owner)
 
            new_user_group.users_group_name = name
 
            new_user_group.user_group_description = description
 
            new_user_group.users_group_active = active
 
            if group_data:
 
                new_user_group.group_data = group_data
 
            self.sa.add(new_user_group)
 
            perm_obj = self._create_default_perms(new_user_group)
 
            self.sa.add(perm_obj)
 

	
 
            self.grant_user_permission(user_group=new_user_group,
 
                                       user=owner, perm='usergroup.admin')
 

	
 
            return new_user_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, user_group, form_data):
 

	
 
        try:
 
            user_group = self._get_user_group(user_group)
 

	
 
            for k, v in form_data.items():
 
                if k == 'users_group_members':
 
                    user_group.members = []
 
                    self.sa.flush()
 
                    members_list = []
 
                    if v:
 
                        v = [v] if isinstance(v, basestring) else v
 
                        for u_id in set(v):
 
                            member = UserGroupMember(user_group.users_group_id, u_id)
 
                            members_list.append(member)
 
                    setattr(user_group, 'members', members_list)
 
                            self.sa.add(member)
 
                    user_group.members = members_list
 
                setattr(user_group, k, v)
 

	
 
            self.sa.add(user_group)
 
            # Flush to make db assign users_group_member_id to newly
 
            # created UserGroupMembers.
 
            self.sa.flush()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, user_group, force=False):
 
        """
 
        Deletes user group, unless force flag is used
 
        raises exception if there are members in that group, else deletes
 
        group and users
 

	
 
        :param user_group:
 
        :param force:
 
        """
 
        user_group = self._get_user_group(user_group)
 
        try:
 
            # check if this group is not assigned to repo
 
            assigned_groups = UserGroupRepoToPerm.query() \
 
                .filter(UserGroupRepoToPerm.users_group == user_group).all()
 
            assigned_groups = [x.repository.repo_name for x in assigned_groups]
 

	
 
            if assigned_groups and not force:
 
                raise UserGroupsAssignedException(
 
                    'User Group assigned to %s' % ", ".join(assigned_groups))
 
            self.sa.delete(user_group)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def add_user_to_group(self, user_group, user):
 
        user_group = self._get_user_group(user_group)
 
        user = self._get_user(user)
 

	
 
        for m in user_group.members:
 
            u = m.user
 
            if u.user_id == user.user_id:
 
                # user already in the group, skip
 
                return True
 

	
 
        try:
 
            user_group_member = UserGroupMember()
 
            user_group_member.user = user
 
            user_group_member.users_group = user_group
 

	
 
            user_group.members.append(user_group_member)
 
            user.group_member.append(user_group_member)
 

	
 
            self.sa.add(user_group_member)
 
            return user_group_member
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def remove_user_from_group(self, user_group, user):
 
        user_group = self._get_user_group(user_group)
 
        user = self._get_user(user)
 

	
 
        user_group_member = None
 
        for m in user_group.members:
 
            if m.user_id == user.user_id:
 
                # Found this user's membership row
 
                user_group_member = m
 
                break
 

	
 
        if user_group_member:
 
            try:
 
                self.sa.delete(user_group_member)
 
                return True
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 
        else:
 
            # User isn't in that group
 
            return False
 

	
 
    def has_perm(self, user_group, perm):
 
        user_group = self._get_user_group(user_group)
 
        perm = self._get_perm(perm)
 

	
 
        return UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == user_group) \
 
            .filter(UserGroupToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user_group, perm):
 
        user_group = self._get_user_group(user_group)
 
        perm = self._get_perm(perm)
 

	
 
        # if this permission is already granted skip it
 
        _perm = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == user_group) \
 
            .filter(UserGroupToPerm.permission == perm) \
 
            .scalar()
 
        if _perm:
 
            return
 

	
 
        new = UserGroupToPerm()
 
        new.users_group = user_group
 
        new.permission = perm
 
        self.sa.add(new)
 
        return new
 

	
 
    def revoke_perm(self, user_group, perm):
 
        user_group = self._get_user_group(user_group)
 
        perm = self._get_perm(perm)
 

	
 
        obj = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == user_group) \
 
            .filter(UserGroupToPerm.permission == perm).scalar()
 
        if obj is not None:
 
            self.sa.delete(obj)
 

	
 
    def grant_user_permission(self, user_group, user, perm):
 
        """
 
        Grant permission for user on given user group, or update
 
        existing one if found
 

	
 
        :param user_group: Instance of UserGroup, users_group_id,
 
            or users_group_name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        user_group = self._get_user_group(user_group)
 
        user = self._get_user(user)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserUserGroupToPerm) \
 
            .filter(UserUserGroupToPerm.user == user) \
 
            .filter(UserUserGroupToPerm.user_group == user_group) \
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserUserGroupToPerm()
 
        obj.user_group = user_group
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s', perm, user, user_group)
 
        return obj
 

	
 
    def revoke_user_permission(self, user_group, user):
 
        """
 
        Revoke permission for user on given repository group
 

	
 
        :param user_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        user_group = self._get_user_group(user_group)
 
        user = self._get_user(user)
 

	
 
        obj = self.sa.query(UserUserGroupToPerm) \
 
            .filter(UserUserGroupToPerm.user == user) \
 
            .filter(UserUserGroupToPerm.user_group == user_group) \
 
            .scalar()
 
        if obj is not None:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s', user_group, user)
 

	
 
    def grant_user_group_permission(self, target_user_group, user_group, perm):
 
        """
 
        Grant user group permission for given target_user_group
 

	
 
        :param target_user_group:
 
        :param user_group:
 
        :param perm:
 
        """
 
        target_user_group = self._get_user_group(target_user_group)
 
        user_group = self._get_user_group(user_group)
 
        permission = self._get_perm(perm)
 
        # forbid assigning same user group to itself
 
        if target_user_group == user_group:
 
            raise RepoGroupAssignmentError('target repo:%s cannot be '
 
                                           'assigned to itself' % target_user_group)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserGroupUserGroupToPerm) \
 
            .filter(UserGroupUserGroupToPerm.target_user_group == target_user_group) \
 
            .filter(UserGroupUserGroupToPerm.user_group == user_group) \
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserGroupUserGroupToPerm()
 
        obj.user_group = user_group
 
        obj.target_user_group = target_user_group
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s', perm, target_user_group, user_group)
 
        return obj
 

	
 
    def revoke_user_group_permission(self, target_user_group, user_group):
kallithea/tests/models/test_settings.py
Show inline comments
 
from kallithea.model.meta import Session
 
from kallithea.model.db import Setting
 

	
 

	
 
name = 'spam-setting-name'
 

	
 
def test_passing_list_setting_value_results_in_string_valued_setting():
 
    assert Setting.get_by_name(name) is None
 
    setting = Setting.create_or_update(name, ['spam', 'eggs'])
 
    Session().flush()
 
    Session().flush() # must flush so we can delete it below
 
    try:
 
        assert Setting.get_by_name(name) is not None
 
        # Quirk: list value is stringified.
 
        assert Setting.get_by_name(name).app_settings_value \
 
               == "['spam', 'eggs']"
 
        assert Setting.get_by_name(name).app_settings_type == 'unicode'
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
 

	
 
def test_list_valued_setting_creation_requires_manual_value_formatting():
 
    assert Setting.get_by_name(name) is None
 
    # Quirk: need manual formatting of list setting value.
 
    setting = Setting.create_or_update(name, 'spam,eggs', type='list')
 
    Session().flush()
 
    Session().flush() # must flush so we can delete it below
 
    try:
 
        assert setting.app_settings_value == ['spam', 'eggs']
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
 

	
 
def test_list_valued_setting_update():
 
    assert Setting.get_by_name(name) is None
 
    setting = Setting.create_or_update(name, 'spam', type='list')
 
    Session().flush()
 
    Session().flush() # must flush so we can delete it below
 
    try:
 
        assert setting.app_settings_value == [u'spam']
 
        # Assign back setting value.
 
        setting.app_settings_value = setting.app_settings_value
 
        # Quirk: value is stringified on write and listified on read.
 
        assert setting.app_settings_value == ["[u'spam']"]
 
        setting.app_settings_value = setting.app_settings_value
 
        assert setting.app_settings_value == ["[u\"[u'spam']\"]"]
 
    finally:
 
        Session().delete(setting)
 
        Session().flush()
0 comments (0 inline, 0 general)