Changeset - 95e149edc46c
[Not reviewed]
default
0 7 0
Mads Kiilerich - 8 years ago 2017-06-13 01:11:31
mads@kiilerich.com
sqlalchemy: fix warnings from running the test suite

Mainly warnings about strings being passed where unicode was expected.
7 files changed with 31 insertions and 26 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -596,13 +596,13 @@ class ApiController(JSONRPCController):
 
                .order_by(User.username)
 
                .filter_by(is_default_user=False)
 
        ]
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def create_user(self, username, email, password=Optional(''),
 
                    firstname=Optional(''), lastname=Optional(''),
 
                    firstname=Optional(u''), lastname=Optional(u''),
 
                    active=Optional(True), admin=Optional(False),
 
                    extern_type=Optional(User.DEFAULT_AUTH_TYPE),
 
                    extern_name=Optional('')):
 
        """
 
        Creates new user. Returns new user object. This command can
 
        be executed only using api_key belonging to user with admin rights.
 
@@ -849,13 +849,13 @@ class ApiController(JSONRPCController):
 
        return [
 
            user_group.get_api_data()
 
            for user_group in UserGroupList(UserGroup.query().all(), perm_level='read')
 
        ]
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
 
    def create_user_group(self, group_name, description=Optional(''),
 
    def create_user_group(self, group_name, description=Optional(u''),
 
                          owner=Optional(OAttr('apiuser')), active=Optional(True)):
 
        """
 
        Creates new user group. This command can be executed only using api_key
 
        belonging to user with admin rights or an user who has create user group
 
        permission
 

	
 
@@ -2520,13 +2520,13 @@ class ApiController(JSONRPCController):
 
            raise JSONRPCError('pull request `%s` does not exist' % (pullrequest_id,))
 
        if not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name):
 
            raise JSONRPCError('not allowed')
 
        return pull_request.get_api_data()
 

	
 
    # permission check inside
 
    def comment_pullrequest(self, pull_request_id, comment_msg='', status=None, close_pr=False):
 
    def comment_pullrequest(self, pull_request_id, comment_msg=u'', status=None, close_pr=False):
 
        """
 
        Add comment, close and change status of pull request.
 
        """
 
        apiuser = get_user_or_error(request.authuser.user_id)
 
        pull_request = PullRequest.get(pull_request_id)
 
        if pull_request is None:
kallithea/model/pull_request.py
Show inline comments
 
@@ -127,12 +127,14 @@ class PullRequestModel(object):
 

	
 
        log.debug("Mentioning %s", mention_recipients)
 
        self.add_reviewers(user, pr, set(), mention_recipients)
 

	
 
    def remove_reviewers(self, user, pull_request, reviewers):
 
        """Remove specified users from being reviewers of the PR."""
 
        if not reviewers:
 
            return # avoid SQLAlchemy warning about empty sequence for IN-predicate
 

	
 
        PullRequestReviewer.query() \
 
            .filter_by(pull_request=pull_request) \
 
            .filter(PullRequestReviewer.user_id.in_(r.user_id for r in reviewers)) \
 
            .delete(synchronize_session='fetch') # the default of 'evaluate' is not available
 

	
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -2507,13 +2507,13 @@ class _BaseTestApi(object):
 
                                  repoid=self.REPO, raw_id = self.TEST_REVISION)
 
        response = api_call(self, params)
 
        expected = u'Access denied to repo %s' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test')
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u'get test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": 'get_pullrequest',
 
            "args": {"pullrequest_id": pull_request_id},
 
@@ -2538,13 +2538,13 @@ class _BaseTestApi(object):
 
        }
 
        self._compare_ok(random_id, expected,
 
                         given=re.sub("\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d\.\d\d\d",
 
                                      "2000-01-01T00:00:00.000", response.body))
 

	
 
    def test_api_close_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test')
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u'close test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "close_pr": True},
 
@@ -2554,13 +2554,13 @@ class _BaseTestApi(object):
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == ''
 
        assert pullrequest.status == PullRequest.STATUS_CLOSED
 
        assert pullrequest.is_closed() == True
 

	
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test")
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
@@ -2579,13 +2579,13 @@ class _BaseTestApi(object):
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 

	
 
    def test_api_comment_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test")
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"comment test")
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
kallithea/tests/fixture.py
Show inline comments
 
@@ -322,23 +322,23 @@ class Fixture(object):
 
    def review_changeset(self, repo, revision, status, author=TEST_USER_ADMIN_LOGIN):
 
        comment = ChangesetCommentsModel().create(u"review comment", repo, author, revision=revision, send_email=False)
 
        csm = ChangesetStatusModel().set_status(repo, ChangesetStatus.STATUS_APPROVED, author, comment, revision=revision)
 
        Session().commit()
 
        return csm
 

	
 
    def create_pullrequest(self, testcontroller, repo_name, pr_src_rev, pr_dst_rev, title='title'):
 
    def create_pullrequest(self, testcontroller, repo_name, pr_src_rev, pr_dst_rev, title=u'title'):
 
        org_ref = 'branch:stable:%s' % pr_src_rev
 
        other_ref = 'branch:default:%s' % pr_dst_rev
 
        with test_context(testcontroller.app): # needed to be able to mock request user
 
            org_repo = other_repo = Repository.get_by_repo_name(repo_name)
 
            owner_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
            reviewers = [User.get_by_username(TEST_USER_REGULAR_LOGIN)]
 
            request.authuser = request.user = AuthUser(dbuser=owner_user)
 
            # creating a PR sends a message with an absolute URL - without routing that requires mocking
 
            with mock.patch.object(helpers, 'url', (lambda arg, qualified=False, **kwargs: ('https://localhost' if qualified else '') + '/fake/' + arg)):
 
                cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, 'No description', owner_user, reviewers)
 
                cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, u'No description', owner_user, reviewers)
 
                pull_request = cmd.execute()
 
            Session().commit()
 
        return pull_request.pull_request_id
 

	
 

	
 
#==============================================================================
kallithea/tests/functional/test_admin_repo_groups.py
Show inline comments
 
@@ -8,13 +8,13 @@ from kallithea.tests.fixture import Fixt
 
fixture = Fixture()
 

	
 
class TestRepoGroupsController(TestController):
 

	
 
    def test_case_insensitivity(self):
 
        self.log_user()
 
        group_name = 'newgroup'
 
        group_name = u'newgroup'
 
        response = self.app.post(url('repos_groups'),
 
                                 fixture._get_repo_group_create_params(group_name=group_name,
 
                                                                 _authentication_token=self.authentication_token()))
 
        # try to create repo group with swapped case
 
        swapped_group_name = group_name.swapcase()
 
        response = self.app.post(url('repos_groups'),
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -49,12 +49,15 @@ class TestPermissions(TestController):
 
            RepoModel().delete(repo=self.test_repo)
 

	
 
        UserModel().delete(self.u1)
 
        UserModel().delete(self.u2)
 
        UserModel().delete(self.u3)
 
        UserModel().delete(self.a1)
 

	
 
        Session().commit() # commit early to avoid SQLAlchemy warning from double cascade delete to users_groups_members
 

	
 
        if hasattr(self, 'g1'):
 
            RepoGroupModel().delete(self.g1.group_id)
 
        if hasattr(self, 'g2'):
 
            RepoGroupModel().delete(self.g2.group_id)
 

	
 
        if hasattr(self, 'ug2'):
kallithea/tests/other/test_auth_ldap.py
Show inline comments
 
@@ -18,14 +18,14 @@ def arrange_ldap_auth(set_test_settings)
 
class _AuthLdapMock():
 

	
 
    def __init__(self, **kwargs):
 
        pass
 

	
 
    def authenticate_ldap(self, username, password):
 
        return 'spam dn', dict(test_ldap_firstname=['spam ldap first name'],
 
                               test_ldap_lastname=['spam ldap last name'],
 
        return 'spam dn', dict(test_ldap_firstname=[u'spam ldap first name'],
 
                               test_ldap_lastname=[u'spam ldap last name'],
 
                               test_ldap_email=['spam ldap email'])
 

	
 

	
 
def test_update_user_attributes_from_ldap(monkeypatch, create_test_user,
 
                                          arrange_ldap_auth):
 
    """Authenticate user with mocked LDAP, verify attributes are updated.
 
@@ -35,14 +35,14 @@ def test_update_user_attributes_from_lda
 
    uniqifier = uuid.uuid4()
 
    username = 'test-user-{0}'.format(uniqifier)
 
    assert User.get_by_username(username) is None
 
    user_input = dict(username='test-user-{0}'.format(uniqifier),
 
                      password='spam password',
 
                      email='spam-email-{0}'.format(uniqifier),
 
                      firstname='spam first name',
 
                      lastname='spam last name',
 
                      firstname=u'spam first name',
 
                      lastname=u'spam last name',
 
                      active=True,
 
                      admin=False)
 
    user = create_test_user(user_input)
 

	
 
    # Arrange LDAP auth.
 
    monkeypatch.setattr(auth_ldap, 'AuthLdap', _AuthLdapMock)
 
@@ -50,20 +50,20 @@ def test_update_user_attributes_from_lda
 
    # Authenticate with LDAP.
 
    user_data = authenticate(username, 'password')
 

	
 
    # Verify that authenication succeeded and retrieved correct attributes
 
    # from LDAP.
 
    assert user_data is not None
 
    assert user_data.get('firstname') == 'spam ldap first name'
 
    assert user_data.get('lastname') == 'spam ldap last name'
 
    assert user_data.get('firstname') == u'spam ldap first name'
 
    assert user_data.get('lastname') == u'spam ldap last name'
 
    assert user_data.get('email') == 'spam ldap email'
 

	
 
    # Verify that authentication overwrote user attributes with the ones
 
    # retrieved from LDAP.
 
    assert user.firstname == 'spam ldap first name'
 
    assert user.lastname == 'spam ldap last name'
 
    assert user.firstname == u'spam ldap first name'
 
    assert user.lastname == u'spam ldap last name'
 
    assert user.email == 'spam ldap email'
 

	
 

	
 
def test_init_user_attributes_from_ldap(monkeypatch, arrange_ldap_auth):
 
    """Authenticate unknown user with mocked LDAP, verify user is created.
 
    """
 
@@ -79,22 +79,22 @@ def test_init_user_attributes_from_ldap(
 
    # Authenticate with LDAP.
 
    user_data = authenticate(username, 'password')
 

	
 
    # Verify that authenication succeeded and retrieved correct attributes
 
    # from LDAP.
 
    assert user_data is not None
 
    assert user_data.get('firstname') == 'spam ldap first name'
 
    assert user_data.get('lastname') == 'spam ldap last name'
 
    assert user_data.get('firstname') == u'spam ldap first name'
 
    assert user_data.get('lastname') == u'spam ldap last name'
 
    assert user_data.get('email') == 'spam ldap email'
 

	
 
    # Verify that authentication created new user with attributes
 
    # retrieved from LDAP.
 
    new_user = User.get_by_username(username)
 
    assert new_user is not None
 
    assert new_user.firstname == 'spam ldap first name'
 
    assert new_user.lastname == 'spam ldap last name'
 
    assert new_user.firstname == u'spam ldap first name'
 
    assert new_user.lastname == u'spam ldap last name'
 
    assert new_user.email == 'spam ldap email'
 

	
 

	
 
class _AuthLdapNoEmailMock():
 

	
 
    def __init__(self, **kwargs):
 
@@ -122,17 +122,17 @@ def test_init_user_attributes_from_ldap_
 
    # Authenticate with LDAP.
 
    user_data = authenticate(username, 'password')
 

	
 
    # Verify that authenication succeeded and retrieved correct attributes
 
    # from LDAP, with empty email.
 
    assert user_data is not None
 
    assert user_data.get('firstname') == 'spam ldap first name'
 
    assert user_data.get('lastname') == 'spam ldap last name'
 
    assert user_data.get('firstname') == u'spam ldap first name'
 
    assert user_data.get('lastname') == u'spam ldap last name'
 
    assert user_data.get('email') == ''
 

	
 
    # Verify that authentication created new user with attributes
 
    # retrieved from LDAP, with email == None.
 
    new_user = User.get_by_username(username)
 
    assert new_user is not None
 
    assert new_user.firstname == 'spam ldap first name'
 
    assert new_user.lastname == 'spam ldap last name'
 
    assert new_user.firstname == u'spam ldap first name'
 
    assert new_user.lastname == u'spam ldap last name'
 
    assert new_user.email is None
0 comments (0 inline, 0 general)