@@ -431,193 +431,193 @@ class User(Base, BaseDbModel):
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UserGroupMember', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
# extra emails for this user
user_emails = relationship('UserEmailMap', cascade='all')
# extra API keys
user_api_keys = relationship('UserApiKeys', cascade='all')
ssh_keys = relationship('UserSshKeys', cascade='all')
@hybrid_property
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
return [self.email] + [x.email for x in other]
def api_keys(self):
other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
return [self.api_key] + [x.api_key for x in other]
def ip_addresses(self):
ret = UserIpMap.query().filter(UserIpMap.user == self).all()
return [x.ip_addr for x in ret]
def full_name(self):
return '%s %s' % (self.firstname, self.lastname)
def full_name_or_username(self):
"""
Show full name.
If full name is not set, fall back to username.
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
def full_name_and_username(self):
Show full name and username as 'Firstname Lastname (username)'.
return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
def full_contact(self):
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
def is_admin(self):
return self.admin
def is_default_user(self):
return self.username == User.DEFAULT_USER
def user_data(self):
if not self._user_data:
return {}
try:
return ext_json.loads(self._user_data)
except TypeError:
@user_data.setter
def user_data(self, val):
self._user_data = ascii_bytes(ext_json.dumps(val))
except Exception:
log.error(traceback.format_exc())
def __repr__(self):
return "<%s %s: %r')>" % (self.__class__.__name__, self.user_id, self.username)
return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
@classmethod
def guess_instance(cls, value):
return super(User, cls).guess_instance(value, User.get_by_username)
def get_or_404(cls, id_, allow_default=True):
'''
Overridden version of BaseDbModel.get_or_404, with an extra check on
the default user.
user = super(User, cls).get_or_404(id_)
if not allow_default and user.is_default_user:
raise DefaultUserException()
return user
def get_by_username_or_email(cls, username_or_email, case_insensitive=True):
For anything that looks like an email address, look up by the email address (matching
case insensitively).
For anything else, try to look up by the user name.
This assumes no normal username can have '@' symbol.
if '@' in username_or_email:
return User.get_by_email(username_or_email)
else:
return User.get_by_username(username_or_email, case_insensitive=case_insensitive)
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
q = cls.query().filter(cls.username == username)
return q.scalar()
def get_by_api_key(cls, api_key, fallback=True):
if len(api_key) != 40 or not api_key.isalnum():
return None
q = cls.query().filter(cls.api_key == api_key)
res = q.scalar()
if fallback and not res:
# fallback to additional keys
_res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
if _res:
res = _res.user
if res is None or not res.active or res.is_default_user:
return res
def get_by_email(cls, email, cache=False):
q = cls.query().filter(sqlalchemy.func.lower(cls.email) == sqlalchemy.func.lower(email))
ret = q.scalar()
if ret is None:
q = UserEmailMap.query()
# try fetching in alternate email map
q = q.filter(sqlalchemy.func.lower(UserEmailMap.email) == sqlalchemy.func.lower(email))
q = q.options(joinedload(UserEmailMap.user))
ret = getattr(q.scalar(), 'user', None)
return ret
def get_from_cs_author(cls, author):
Tries to get User objects out of commit author string
:param author:
from kallithea.lib.helpers import email, author_name
# Valid email in the attribute passed, see if they're in the system
_email = email(author)
if _email:
user = cls.get_by_email(_email)
if user is not None:
# Maybe we can match by username?
_author = author_name(author)
user = cls.get_by_username(_author, case_insensitive=True)
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
log.debug('updated user %s lastlogin', self.username)
def get_first_admin(cls):
user = User.query().filter(User.admin == True).first()
@@ -671,246 +671,246 @@ class User(Base, BaseDbModel):
class UserApiKeys(Base, BaseDbModel):
__tablename__ = 'user_api_keys'
__table_args__ = (
Index('uak_api_key_idx', 'api_key'),
Index('uak_api_key_expires_idx', 'api_key', 'expires'),
_table_args_default_dict,
)
user_api_key_id = Column(Integer(), primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
api_key = Column(String(255), nullable=False, unique=True)
description = Column(UnicodeText(), nullable=False)
expires = Column(Float(53), nullable=False)
created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
user = relationship('User')
def is_expired(self):
return (self.expires != -1) & (time.time() > self.expires)
class UserEmailMap(Base, BaseDbModel):
__tablename__ = 'user_email_map'
Index('uem_email_idx', 'email'),
email_id = Column(Integer(), primary_key=True)
_email = Column("email", String(255), nullable=False, unique=True)
@validates('_email')
def validate_email(self, key, email):
# check if this email is not main one
main_email = Session().query(User).filter(User.email == email).scalar()
if main_email is not None:
raise AttributeError('email %s is present is user table' % email)
return email
class UserIpMap(Base, BaseDbModel):
__tablename__ = 'user_ip_map'
UniqueConstraint('user_id', 'ip_addr'),
ip_id = Column(Integer(), primary_key=True)
ip_addr = Column(String(255), nullable=False)
active = Column(Boolean(), nullable=False, default=True)
def _get_ip_range(cls, ip_addr):
net = ipaddr.IPNetwork(address=ip_addr)
return [str(net.network), str(net.broadcast)]
def __json__(self):
return dict(
ip_addr=self.ip_addr,
ip_range=self._get_ip_range(self.ip_addr)
return "<%s %s: %s>" % (self.__class__.__name__, self.user_id, self.ip_addr)
class UserLog(Base, BaseDbModel):
__tablename__ = 'user_logs'
user_log_id = Column(Integer(), primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
username = Column(String(255), nullable=False)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column(Unicode(255), nullable=False)
user_ip = Column(String(255), nullable=True)
action = Column(UnicodeText(), nullable=False)
action_date = Column(DateTime(timezone=False), nullable=False)
return "<%s %r: %r')>" % (self.__class__.__name__,
return "<%s %r: %r>" % (self.__class__.__name__,
self.repository_name,
self.action)
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
repository = relationship('Repository', cascade='')
class UserGroup(Base, BaseDbModel):
__tablename__ = 'users_groups'
users_group_id = Column(Integer(), primary_key=True)
users_group_name = Column(Unicode(255), nullable=False, unique=True)
user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
users_group_active = Column(Boolean(), nullable=False)
owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
_group_data = Column("group_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
members = relationship('UserGroupMember', cascade="all, delete-orphan")
users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
user_user_group_to_perm = relationship('UserUserGroupToPerm', cascade='all')
user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
owner = relationship('User')
def group_data(self):
if not self._group_data:
return ext_json.loads(self._group_data)
@group_data.setter
def group_data(self, val):
self._group_data = ascii_bytes(ext_json.dumps(val))
return "<%s %s: %r')>" % (self.__class__.__name__,
return "<%s %s: %r>" % (self.__class__.__name__,
self.users_group_id,
self.users_group_name)
return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
def get_by_group_name(cls, group_name, case_insensitive=False):
q = cls.query().filter(sqlalchemy.func.lower(cls.users_group_name) == sqlalchemy.func.lower(group_name))
q = cls.query().filter(cls.users_group_name == group_name)
def get(cls, user_group_id):
user_group = cls.query()
return user_group.get(user_group_id)
def get_api_data(self, with_members=True):
user_group = self
data = dict(
users_group_id=user_group.users_group_id,
group_name=user_group.users_group_name,
group_description=user_group.user_group_description,
active=user_group.users_group_active,
owner=user_group.owner.username,
if with_members:
data['members'] = [
ugm.user.get_api_data()
for ugm in user_group.members
]
return data
class UserGroupMember(Base, BaseDbModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column(Integer(), primary_key=True)
users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
users_group = relationship('UserGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class RepositoryField(Base, BaseDbModel):
__tablename__ = 'repositories_fields'
UniqueConstraint('repository_id', 'field_key'), # no-multi field
PREFIX = 'ex_' # prefix used in form to not conflict with already existing fields
repo_field_id = Column(Integer(), primary_key=True)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
field_key = Column(String(250), nullable=False)
field_label = Column(String(1024), nullable=False)
field_value = Column(String(10000), nullable=False)
field_desc = Column(String(1024), nullable=False)
field_type = Column(String(255), nullable=False)
repository = relationship('Repository')
def field_key_prefixed(self):
return 'ex_%s' % self.field_key
def un_prefix_key(cls, key):
if key.startswith(cls.PREFIX):
return key[len(cls.PREFIX):]
return key
def get_by_key_name(cls, key, repo):
row = cls.query() \
.filter(cls.repository == repo) \
.filter(cls.field_key == key).scalar()
return row
class Repository(Base, BaseDbModel):
Status change: