Changeset - 20e307d5250f
[Not reviewed]
codereview
0 2 0
Marcin Kuzminski - 13 years ago 2012-05-23 00:11:45
marcin@python-works.com
Added email-map for alternative email addresses for users
2 files changed with 98 insertions and 5 deletions:
0 comments (0 inline, 0 general)
rhodecode/model/db.py
Show inline comments
 
@@ -48,6 +48,7 @@ from rhodecode.lib.caching_query import 
 

	
 
from rhodecode.model.meta import Base, Session
 
import hashlib
 
from sqlalchemy.exc import DatabaseError
 

	
 

	
 
log = logging.getLogger(__name__)
 
@@ -381,8 +382,23 @@ class User(Base, BaseModel):
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % email))
 
        return q.scalar()
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            if case_insensitive:
 
                q = q.filter(UserEmailMap.email.ilike(email))
 
            else:
 
                q = q.filter(UserEmailMap.email == email)
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
@@ -403,6 +419,38 @@ class User(Base, BaseModel):
 
        )
 

	
 

	
 
class UserEmailMap(Base, BaseModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        UniqueConstraint('email'),
 
        {'extend_existing': True, 'mysql_engine':'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    __mapper_args__ = {}
 

	
 
    email_id = Column("email_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
 
    _email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
 

	
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session.query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserLog(Base, BaseModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
rhodecode/tests/test_models.py
Show inline comments
 
@@ -6,8 +6,8 @@ from rhodecode.model.repos_group import 
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
 
    UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\
 
    Repository
 
from sqlalchemy.exc import IntegrityError
 
    Repository, UserEmailMap
 
from sqlalchemy.exc import IntegrityError, DatabaseError
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.model.meta import Session
 
@@ -181,7 +181,8 @@ class TestUser(unittest.TestCase):
 
        super(TestUser, self).__init__(methodName=methodName)
 

	
 
    def test_create_and_remove(self):
 
        usr = UserModel().create_or_update(username=u'test_user', password=u'qweqwe',
 
        usr = UserModel().create_or_update(username=u'test_user', 
 
                                           password=u'qweqwe',
 
                                     email=u'u232@rhodecode.org',
 
                                     name=u'u1', lastname=u'u1')
 
        Session.commit()
 
@@ -201,6 +202,50 @@ class TestUser(unittest.TestCase):
 

	
 
        self.assertEqual(UsersGroupMember.query().all(), [])
 

	
 
    def test_additonal_email_as_main(self):
 
        usr = UserModel().create_or_update(username=u'test_user', 
 
                                           password=u'qweqwe',
 
                                     email=u'main_email@rhodecode.org',
 
                                     name=u'u1', lastname=u'u1')
 
        Session.commit()
 

	
 
        def do():
 
            m = UserEmailMap()
 
            m.email = u'main_email@rhodecode.org'
 
            m.user = usr
 
            Session.add(m)
 
            Session.commit()
 
        self.assertRaises(AttributeError, do)
 

	
 
        UserModel().delete(usr.user_id)
 
        Session.commit()
 

	
 
    def test_extra_email_map(self):
 
        usr = UserModel().create_or_update(username=u'test_user', 
 
                                           password=u'qweqwe',
 
                                     email=u'main_email@rhodecode.org',
 
                                     name=u'u1', lastname=u'u1')
 
        Session.commit()
 

	
 
        m = UserEmailMap()
 
        m.email = u'main_email2@rhodecode.org'
 
        m.user = usr
 
        Session.add(m)
 
        Session.commit()
 

	
 
        u = User.get_by_email(email='main_email@rhodecode.org')
 
        self.assertEqual(usr.user_id, u.user_id)
 
        self.assertEqual(usr.username, u.username)
 

	
 
        u = User.get_by_email(email='main_email2@rhodecode.org')
 
        self.assertEqual(usr.user_id, u.user_id)
 
        self.assertEqual(usr.username, u.username)
 
        u = User.get_by_email(email='main_email3@rhodecode.org')
 
        self.assertEqual(None, u)
 

	
 
        UserModel().delete(usr.user_id)
 
        Session.commit()
 

	
 

	
 
class TestNotifications(unittest.TestCase):
 

	
0 comments (0 inline, 0 general)