Changeset - 2afe9320d5e6
[Not reviewed]
beta
0 7 0
Marcin Kuzminski - 14 years ago 2011-10-08 03:00:03
marcin@python-works.com
updated docstrings
7 files changed with 67 insertions and 19 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.auth
 
    ~~~~~~~~~~~~~~~~~~
 

	
 
    authentication and permission libraries
 

	
 
    :created_on: Apr 4, 2010
 
    :copyright: (c) 2010 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import random
 
import logging
 
import traceback
 
import hashlib
 

	
 
from tempfile import _RandomNameSequence
 
from decorator import decorator
 

	
 
from pylons import config, session, url, request
 
from pylons.controllers.util import abort, redirect
 
from pylons.i18n.translation import _
 

	
 
from rhodecode import __platform__, PLATFORM_WIN, PLATFORM_OTHERS
 

	
 
if __platform__ in PLATFORM_WIN:
 
    from hashlib import sha256
 
if __platform__ in PLATFORM_OTHERS:
 
    import bcrypt
 

	
 
from rhodecode.lib import str2bool, safe_unicode
 
from rhodecode.lib.exceptions import LdapPasswordError, LdapUsernameError
 
from rhodecode.lib.utils import get_repo_slug
 
from rhodecode.lib.auth_ldap import AuthLdap
 

	
 
from rhodecode.model import meta
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.db import Permission, RhodeCodeSettings, User
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """This is a simple class for generating password from
 
        different sets of characters
 
        usage:
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        print passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def __init__(self, passwd=''):
 
        self.passwd = passwd
 

	
 
    def gen_password(self, len, type):
 
        self.passwd = ''.join([random.choice(type) for _ in xrange(len)])
 
        return self.passwd
 

	
 

	
 
class RhodeCodeCrypto(object):
 

	
 
    @classmethod
 
    def hash_string(cls, str_):
 
        """
 
        Cryptographic function used for password hashing based on pybcrypt
 
        or pycrypto in windows
 

	
 
        :param password: password to hash
 
        """
 
        if __platform__ in PLATFORM_WIN:
 
            return sha256(str_).hexdigest()
 
        elif __platform__ in PLATFORM_OTHERS:
 
            return bcrypt.hashpw(str_, bcrypt.gensalt(10))
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 
    @classmethod
 
    def hash_check(cls, password, hashed):
 
        """
 
        Checks matching password with it's hashed value, runs different
 
        implementation based on platform it runs on
rhodecode/lib/backup_manager.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.backup_manager
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Mercurial repositories backup manager, it allows to backups all 
 
    repositories and send it to backup server using RSA key via ssh.
 

	
 
    :created_on: Feb 28, 2010
 
    :copyright: (c) 2010 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import sys
 

	
 
import logging
 
import tarfile
 
import datetime
 
import subprocess
 

	
 
logging.basicConfig(level=logging.DEBUG,
 
                    format="%(asctime)s %(levelname)-5.5s %(message)s")
 

	
 

	
 
class BackupManager(object):
 
    def __init__(self, repos_location, rsa_key, backup_server):
 
        today = datetime.datetime.now().weekday() + 1
 
        self.backup_file_name = "mercurial_repos.%s.tar.gz" % today
 

	
 
        self.id_rsa_path = self.get_id_rsa(rsa_key)
 
        self.repos_path = self.get_repos_path(repos_location)
 
        self.backup_server = backup_server
 

	
 
        self.backup_file_path = '/tmp'
 

	
 
        logging.info('starting backup for %s', self.repos_path)
 
        logging.info('backup target %s', self.backup_file_path)
 

	
 
    def get_id_rsa(self, rsa_key):
 
        if not os.path.isfile(rsa_key):
 
            logging.error('Could not load id_rsa key file in %s', rsa_key)
 
            sys.exit()
 
        return rsa_key
 

	
 
    def get_repos_path(self, path):
 
        if not os.path.isdir(path):
 
            logging.error('Wrong location for repositories in %s', path)
 
            sys.exit()
 
        return path
 

	
 
    def backup_repos(self):
 
        bckp_file = os.path.join(self.backup_file_path, self.backup_file_name)
 
        tar = tarfile.open(bckp_file, "w:gz")
 

	
 
        for dir_name in os.listdir(self.repos_path):
 
            logging.info('backing up %s', dir_name)
 
            tar.add(os.path.join(self.repos_path, dir_name), dir_name)
 
        tar.close()
 
        logging.info('finished backup of mercurial repositories')
 

	
 
    def transfer_files(self):
 
        params = {
 
                  'id_rsa_key': self.id_rsa_path,
 
                  'backup_file': os.path.join(self.backup_file_path,
 
                                             self.backup_file_name),
 
                  'backup_server': self.backup_server
 
                  }
 
        cmd = ['scp', '-l', '40000', '-i', '%(id_rsa_key)s' % params,
 
               '%(backup_file)s' % params,
 
               '%(backup_server)s' % params]
 

	
 
        subprocess.call(cmd)
 
        logging.info('Transfered file %s to %s', self.backup_file_name, cmd[4])
 

	
 
    def rm_file(self):
 
        logging.info('Removing file %s', self.backup_file_name)
 
        os.remove(os.path.join(self.backup_file_path, self.backup_file_name))
 

	
 
if __name__ == "__main__":
 

	
 
    repo_location = '/home/repo_path'
 
    backup_server = 'root@192.168.1.100:/backups/mercurial'
 
    rsa_key = '/home/id_rsa'
 

	
 
    B_MANAGER = BackupManager(repo_location, rsa_key, backup_server)
 
    B_MANAGER.backup_repos()
 
    B_MANAGER.transfer_files()
 
    B_MANAGER.rm_file()
rhodecode/lib/exceptions.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.exceptions
 
    ~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Set of custom exceptions used in RhodeCode
 

	
 
    :created_on: Nov 17, 2010
 
    :copyright: (c) 2010 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 

	
 
class LdapUsernameError(Exception):
 
    pass
 

	
 

	
 
class LdapPasswordError(Exception):
 
    pass
 

	
 

	
 
class LdapConnectionError(Exception):
 
    pass
 

	
 

	
 
class LdapImportError(Exception):
 
    pass
 

	
 

	
 
class DefaultUserException(Exception):
 
    pass
 

	
 

	
 
class UserOwnsReposException(Exception):
 
    pass
 

	
 
class UsersGroupsAssignedException(Exception):
 
    pass
rhodecode/lib/smtp_mailer.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.smtp_mailer
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Simple smtp mailer used in RhodeCode
 

	
 
    :created_on: Sep 13, 2010
 
    :copyright: (c) 2011 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import smtplib
 
import mimetypes
 
from socket import sslerror
 

	
 
from email.mime.multipart import MIMEMultipart
 
from email.mime.image import MIMEImage
 
from email.mime.audio import MIMEAudio
 
from email.mime.base import MIMEBase
 
from email.mime.text import MIMEText
 
from email.utils import formatdate
 
from email import encoders
 

	
 

	
 
class SmtpMailer(object):
 
    """SMTP mailer class
 

	
 
    mailer = SmtpMailer(mail_from, user, passwd, mail_server,
 
                        mail_port, ssl, tls)
 
    mailer.send(recipients, subject, body, attachment_files)
 

	
 
    :param recipients might be a list of string or single string
 
    :param attachment_files is a dict of {filename:location}
 
        it tries to guess the mimetype and attach the file
 

	
 
    """
 

	
 
    def __init__(self, mail_from, user, passwd, mail_server,
 
                    mail_port=None, ssl=False, tls=False, debug=False):
 

	
 
        self.mail_from = mail_from
 
        self.mail_server = mail_server
 
        self.mail_port = mail_port
 
        self.user = user
 
        self.passwd = passwd
 
        self.ssl = ssl
 
        self.tls = tls
 
        self.debug = debug
 

	
 
    def send(self, recipients=[], subject='', body='', attachment_files=None):
 

	
 
        if isinstance(recipients, basestring):
 
            recipients = [recipients]
 
        if self.ssl:
 
            smtp_serv = smtplib.SMTP_SSL(self.mail_server, self.mail_port)
 
        else:
 
            smtp_serv = smtplib.SMTP(self.mail_server, self.mail_port)
 

	
 
        if self.tls:
 
            smtp_serv.ehlo()
 
            smtp_serv.starttls()
 

	
 
        if self.debug:
 
            smtp_serv.set_debuglevel(1)
 

	
 
        smtp_serv.ehlo()
 

	
 
        #if server requires authorization you must provide login and password
 
        #but only if we have them
 
        if self.user and self.passwd:
 
            smtp_serv.login(self.user, self.passwd)
 

	
 
        date_ = formatdate(localtime=True)
 
        msg = MIMEMultipart()
 
        msg.set_type('multipart/alternative')
 
        msg.preamble = 'You will not see this in a MIME-aware mail reader.\n'
 

	
 
        text_msg = MIMEText(body)
 
        text_msg.set_type('text/plain')
 
        text_msg.set_param('charset', 'UTF-8')
 

	
 
        msg['From'] = self.mail_from
 
        msg['To'] = ','.join(recipients)
 
        msg['Date'] = date_
 
        msg['Subject'] = subject
 

	
 
        msg.attach(text_msg)
 

	
 
        if attachment_files:
 
            self.__atach_files(msg, attachment_files)
 

	
 
        smtp_serv.sendmail(self.mail_from, recipients, msg.as_string())
 
        logging.info('MAIL SEND TO: %s' % recipients)
 

	
 
        try:
 
            smtp_serv.quit()
 
        except sslerror:
 
            # sslerror is raised in tls connections on closing sometimes
 
            pass
 

	
 
    def __atach_files(self, msg, attachment_files):
 
        if isinstance(attachment_files, dict):
 
            for f_name, msg_file in attachment_files.items():
 
                ctype, encoding = mimetypes.guess_type(f_name)
 
                logging.info("guessing file %s type based on %s", ctype,
rhodecode/tests/test_concurency.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.tests.test_hg_operations
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Test suite for making push/pull operations
 

	
 
    :created_on: Dec 30, 2010
 
    :copyright: (c) 2010 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import sys
 
import shutil
 
import logging
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from pylons import config
 
from sqlalchemy import engine_from_config
 

	
 
from rhodecode.lib.utils import add_cache
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import User, Repository
 
from rhodecode.lib.auth import get_crypt_password
 

	
 
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
 
from rhodecode.config.environment import load_environment
 

	
 
rel_path = dn(dn(dn(os.path.abspath(__file__))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
add_cache(conf)
 

	
 
USER = 'test_admin'
 
PASS = 'test12'
 
HOST = '127.0.0.1:5000'
 
DEBUG = True
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        log.debug('Executing %s' % command)
 
        if DEBUG:
 
            print command
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            print stdout, stderr
 
        return stdout, stderr
 

	
 
def get_session():
 
    engine = engine_from_config(conf, 'sqlalchemy.db1.')
 
    init_model(engine)
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def create_test_user(force=True):
 
    print 'creating test user'
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 

	
 
    if force and user is not None:
 
        print 'removing current user'
 
        for repo in sa.query(Repository).filter(Repository.user == user).all():
 
            sa.delete(repo)
 
        sa.delete(user)
 
        sa.commit()
 

	
 
    if user is None or force:
 
        print 'creating new one'
 
        new_usr = User()
 
        new_usr.username = USER
 
        new_usr.password = get_crypt_password(PASS)
 
        new_usr.email = 'mail@mail.com'
 
        new_usr.name = 'test'
 
        new_usr.lastname = 'lasttestname'
 
        new_usr.active = True
 
        new_usr.admin = True
 
        sa.add(new_usr)
 
        sa.commit()
 

	
 
    print 'done'
 

	
 

	
 
def create_test_repo(force=True):
 
    print 'creating test repo'
 
    from rhodecode.model.repo import RepoModel
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 
    if user is None:
 
        raise Exception('user not found')
 

	
 

	
 
    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
 

	
 
    if repo is None:
 
        print 'repo not found creating'
 

	
 
        form_data = {'repo_name':HG_REPO,
 
                     'repo_type':'hg',
 
                     'private':False,
 
                     'clone_uri':'' }
 
        rm = RepoModel(sa)
 
        rm.base_path = '/home/hg'
 
        rm.create(form_data, user)
 
    
 

	
 
    print 'done'
 

	
 
def set_anonymous_access(enable=True):
 
    sa = get_session()
 
    user = sa.query(User).filter(User.username == 'default').one()
 
    user.active = enable
 
    sa.add(user)
 
    sa.commit()
 

	
 
def get_anonymous_access():
 
    sa = get_session()
 
    return sa.query(User).filter(User.username == 'default').one().active
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 
def test_clone_with_credentials(no_errors=False,repo=HG_REPO):
 
def test_clone_with_credentials(no_errors=False, repo=HG_REPO):
 
    cwd = path = jn(TESTS_TMP_PATH, repo)
 

	
 
    
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
 
                  {'user':USER,
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':repo,
 
                   'dest':path+_RandomNameSequence().next()}
 
                   'dest':path + _RandomNameSequence().next()}
 

	
 
    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
 

	
 
    if no_errors is False:
 
        assert """adding file changes""" in stdout, 'no messages about cloning'
 
        assert """abort""" not in stderr , 'got error from clone'
 

	
 
if __name__ == '__main__':
 
    try:
 
        create_test_user(force=False)
 

	
 
        for i in range(int(sys.argv[2])):
 
            test_clone_with_credentials(repo=sys.argv[1])
 

	
 
    except Exception,e:
 
    except Exception, e:
 
        sys.exit('stop on %s' % e)
rhodecode/tests/test_hg_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.tests.test_hg_operations
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Test suite for making push/pull operations
 

	
 
    :created_on: Dec 30, 2010
 
    :copyright: (c) 2010 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import time
 
import sys
 
import shutil
 
import logging
 

	
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from pylons import config
 
from sqlalchemy import engine_from_config
 

	
 
from rhodecode.lib.utils import add_cache
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import User, Repository, UserLog
 
from rhodecode.lib.auth import get_crypt_password
 

	
 
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
 
from rhodecode.config.environment import load_environment
 

	
 
rel_path = dn(dn(dn(os.path.abspath(__file__))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
add_cache(conf)
 

	
 
USER = 'test_admin'
 
PASS = 'test12'
 
HOST = '127.0.0.1:5000'
 
DEBUG = True if sys.argv[1:] else False
 
print 'DEBUG:', DEBUG
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        log.debug('Executing %s' % command)
 
        if DEBUG:
 
            print command
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            print stdout, stderr
 
        return stdout, stderr
 

	
 

	
 
def test_wrapp(func):
 

	
 
    def __wrapp(*args, **kwargs):
 
        print '>>>%s' % func.__name__
 
        try:
 
            res = func(*args, **kwargs)
 
        except Exception, e:
 
            print ('###############\n-'
 
                   '--%s failed %s--\n'
 
                   '###############\n' % (func.__name__, e))
 
            sys.exit()
 
        print '++OK++'
 
        return res
 
    return __wrapp
 

	
 
def get_session():
 
    engine = engine_from_config(conf, 'sqlalchemy.db1.')
 
    init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def create_test_user(force=True):
 
    print '\tcreating test user'
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 

	
 
    if force and user is not None:
 
        print '\tremoving current user'
 
        for repo in sa.query(Repository).filter(Repository.user == user).all():
 
            sa.delete(repo)
 
        sa.delete(user)
 
        sa.commit()
 

	
 
    if user is None or force:
rhodecode/tests/test_libs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.tests.test_libs
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 

	
 
    Package for testing various lib/helper functions in rhodecode
 
    
 
    :created_on: Jun 9, 2011
 
    :copyright: (c) 2011 by marcink.
 
    :license: LICENSE_NAME, see LICENSE_FILE for more details.
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 

	
 

	
 
import unittest
 
from rhodecode.tests import *
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
 
     '%s://domain.org' % proto),
 
    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
 
                                                '8080'],
 
     '%s://domain.org:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
 
     '%s://domain.org' % proto),
 
    ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
 
                                                '8080'],
 
     '%s://domain.org:8080' % proto),
 
]
 

	
 

	
 
class TestLibs(unittest.TestCase):
 

	
 

	
 
    def test_uri_filter(self):
 
        from rhodecode.lib import uri_filter
 

	
 
        for url in TEST_URLS:
 
            self.assertEqual(uri_filter(url[0]), url[1])
 

	
 
    def test_credentials_filter(self):
 
        from rhodecode.lib import credentials_filter
 

	
 
        for url in TEST_URLS:
 
            self.assertEqual(credentials_filter(url[0]), url[2])
 

	
 

	
 
    def test_str2bool(self):
 
        from rhodecode.lib import str2bool
 
        test_cases = [
 
            ('t', True),
 
            ('true', True),
 
            ('y', True),
 
            ('yes', True),
 
            ('on', True),
 
            ('1', True),
 
            ('Y', True),
 
            ('yeS', True),
 
            ('Y', True),
 
            ('TRUE', True),
 
            ('T', True),
 
            ('False', False),
 
            ('F', False),
 
            ('FALSE', False),
 
            ('0', False),
 
            ('-1', False),
 
            ('', False), ]
 

	
 
        for case in test_cases:
 
            self.assertEqual(str2bool(case[0]), case[1])
 

	
0 comments (0 inline, 0 general)