Changeset - aa093e05a1c6
[Not reviewed]
default
0 2 0
Mads Kiilerich - 6 years ago 2020-02-05 23:03:39
mads@kiilerich.com
py3: automatic migration with 2to3 -f raw_input
2 files changed with 4 insertions and 4 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/db_manage.py
Show inline comments
 
@@ -144,202 +144,202 @@ class DbManage(object):
 

	
 
        self.sa.commit()
 

	
 
    def fix_default_user(self):
 
        """
 
        Fixes a old default user with some 'nicer' default values,
 
        used mostly for anonymous access
 
        """
 
        def_user = User.query().filter_by(is_default_user=True).one()
 

	
 
        def_user.name = 'Anonymous'
 
        def_user.lastname = 'User'
 
        def_user.email = 'anonymous@kallithea-scm.org'
 

	
 
        self.sa.commit()
 

	
 
    def fix_settings(self):
 
        """
 
        Fixes kallithea settings adds ga_code key for google analytics
 
        """
 

	
 
        hgsettings3 = Setting('ga_code', '')
 

	
 
        self.sa.add(hgsettings3)
 
        self.sa.commit()
 

	
 
    def admin_prompt(self, second=False):
 
        if not self.tests:
 
            import getpass
 

	
 
            username = self.cli_args.get('username')
 
            password = self.cli_args.get('password')
 
            email = self.cli_args.get('email')
 

	
 
            def get_password():
 
                password = getpass.getpass('Specify admin password '
 
                                           '(min 6 chars):')
 
                confirm = getpass.getpass('Confirm password:')
 

	
 
                if password != confirm:
 
                    log.error('passwords mismatch')
 
                    return False
 
                if len(password) < 6:
 
                    log.error('password is to short use at least 6 characters')
 
                    return False
 

	
 
                return password
 
            if username is None:
 
                username = raw_input('Specify admin username:')
 
                username = input('Specify admin username:')
 
            if password is None:
 
                password = get_password()
 
                if not password:
 
                    # second try
 
                    password = get_password()
 
                    if not password:
 
                        sys.exit()
 
            if email is None:
 
                email = raw_input('Specify admin email:')
 
                email = input('Specify admin email:')
 
            self.create_user(username, password, email, True)
 
        else:
 
            log.info('creating admin and regular test users')
 
            from kallithea.tests.base import TEST_USER_ADMIN_LOGIN, \
 
                TEST_USER_ADMIN_PASS, TEST_USER_ADMIN_EMAIL, \
 
                TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, \
 
                TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR2_LOGIN, \
 
                TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL
 

	
 
            self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
 
                             TEST_USER_ADMIN_EMAIL, True)
 

	
 
            self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                             TEST_USER_REGULAR_EMAIL, False)
 

	
 
            self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
 
                             TEST_USER_REGULAR2_EMAIL, False)
 

	
 
    def create_auth_plugin_options(self, skip_existing=False):
 
        """
 
        Create default auth plugin settings, and make it active
 

	
 
        :param skip_existing:
 
        """
 

	
 
        for k, v, t in [('auth_plugins', 'kallithea.lib.auth_modules.auth_internal', 'list'),
 
                        ('auth_internal_enabled', 'True', 'bool')]:
 
            if skip_existing and Setting.get_by_name(k) is not None:
 
                log.debug('Skipping option %s', k)
 
                continue
 
            setting = Setting(k, v, t)
 
            self.sa.add(setting)
 

	
 
    def create_default_options(self, skip_existing=False):
 
        """Creates default settings"""
 

	
 
        for k, v, t in [
 
            ('default_repo_enable_downloads', False, 'bool'),
 
            ('default_repo_enable_statistics', False, 'bool'),
 
            ('default_repo_private', False, 'bool'),
 
            ('default_repo_type', 'hg', 'unicode')
 
        ]:
 
            if skip_existing and Setting.get_by_name(k) is not None:
 
                log.debug('Skipping option %s', k)
 
                continue
 
            setting = Setting(k, v, t)
 
            self.sa.add(setting)
 

	
 
    def fixup_groups(self):
 
        def_usr = User.get_default_user()
 
        for g in RepoGroup.query().all():
 
            g.group_name = g.get_new_name(g.name)
 
            # get default perm
 
            default = UserRepoGroupToPerm.query() \
 
                .filter(UserRepoGroupToPerm.group == g) \
 
                .filter(UserRepoGroupToPerm.user == def_usr) \
 
                .scalar()
 

	
 
            if default is None:
 
                log.debug('missing default permission for group %s adding', g)
 
                RepoGroupModel()._create_default_perms(g)
 

	
 
    def reset_permissions(self, username):
 
        """
 
        Resets permissions to default state, useful when old systems had
 
        bad permissions, we must clean them up
 

	
 
        :param username:
 
        """
 
        default_user = User.get_by_username(username)
 
        if not default_user:
 
            return
 

	
 
        u2p = UserToPerm.query() \
 
            .filter(UserToPerm.user == default_user).all()
 
        fixed = False
 
        if len(u2p) != len(Permission.DEFAULT_USER_PERMISSIONS):
 
            for p in u2p:
 
                Session().delete(p)
 
            fixed = True
 
            self.populate_default_permissions()
 
        return fixed
 

	
 
    def update_repo_info(self):
 
        for repo in Repository.query():
 
            repo.update_changeset_cache()
 

	
 
    def prompt_repo_root_path(self, test_repo_path='', retries=3):
 
        _path = self.cli_args.get('repos_location')
 
        if retries == 3:
 
            log.info('Setting up repositories config')
 

	
 
        if _path is not None:
 
            path = _path
 
        elif not self.tests and not test_repo_path:
 
            path = raw_input(
 
            path = input(
 
                 'Enter a valid absolute path to store repositories. '
 
                 'All repositories in that path will be added automatically:'
 
            )
 
        else:
 
            path = test_repo_path
 
        path_ok = True
 

	
 
        # check proper dir
 
        if not os.path.isdir(path):
 
            path_ok = False
 
            log.error('Given path %s is not a valid directory', path)
 

	
 
        elif not os.path.isabs(path):
 
            path_ok = False
 
            log.error('Given path %s is not an absolute path', path)
 

	
 
        # check if path is at least readable.
 
        if not os.access(path, os.R_OK):
 
            path_ok = False
 
            log.error('Given path %s is not readable', path)
 

	
 
        # check write access, warn user about non writeable paths
 
        elif not os.access(path, os.W_OK) and path_ok:
 
            log.warning('No write permission to given path %s', path)
 
            if not self._ask_ok('Given path %s is not writeable, do you want to '
 
                          'continue with read only mode ? [y/n]' % (path,)):
 
                log.error('Canceled by user')
 
                sys.exit(-1)
 

	
 
        if retries == 0:
 
            sys.exit('max retries reached')
 
        if not path_ok:
 
            if _path is not None:
 
                sys.exit('Invalid repo path: %s' % _path)
 
            retries -= 1
 
            return self.prompt_repo_root_path(test_repo_path, retries) # recursing!!!
 

	
 
        real_path = os.path.normpath(os.path.realpath(path))
 

	
 
        if real_path != os.path.normpath(path):
 
            log.warning('Using normalized path %s instead of %s', real_path, path)
 

	
 
        return real_path
 

	
 
    def create_settings(self, repo_root_path):
 
        ui_config = [
 
            ('paths', '/', repo_root_path, True),
 
            #('phases', 'publish', 'false', False)
kallithea/lib/utils2.py
Show inline comments
 
@@ -557,57 +557,57 @@ class Optional(object):
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in r"""`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        ok = input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print(complaint)
0 comments (0 inline, 0 general)