@@ -479,97 +479,97 @@ class User(Base, BaseDbModel):
If full name is not set, fall back to username.
"""
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
@property
def full_name_and_username(self):
Show full name and username as 'Firstname Lastname (username)'.
return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
def full_contact(self):
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
return '%s %s' % (self.firstname, self.lastname)
def is_admin(self):
return self.admin
@hybrid_property
def is_default_user(self):
return self.username == User.DEFAULT_USER
def user_data(self):
if not self._user_data:
return {}
try:
return ext_json.loads(self._user_data)
except TypeError:
@user_data.setter
def user_data(self, val):
self._user_data = ascii_bytes(ext_json.dumps(val))
except Exception:
log.error(traceback.format_exc())
def __repr__(self):
return "<%s %s: %r')>" % (self.__class__.__name__, self.user_id, self.username)
return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
@classmethod
def guess_instance(cls, value):
return super(User, cls).guess_instance(value, User.get_by_username)
def get_or_404(cls, id_, allow_default=True):
'''
Overridden version of BaseDbModel.get_or_404, with an extra check on
the default user.
user = super(User, cls).get_or_404(id_)
if not allow_default and user.is_default_user:
raise DefaultUserException()
return user
def get_by_username_or_email(cls, username_or_email, case_insensitive=True):
For anything that looks like an email address, look up by the email address (matching
case insensitively).
For anything else, try to look up by the user name.
This assumes no normal username can have '@' symbol.
if '@' in username_or_email:
return User.get_by_email(username_or_email)
else:
return User.get_by_username(username_or_email, case_insensitive=case_insensitive)
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
q = cls.query().filter(cls.username == username)
return q.scalar()
def get_by_api_key(cls, api_key, fallback=True):
if len(api_key) != 40 or not api_key.isalnum():
return None
q = cls.query().filter(cls.api_key == api_key)
res = q.scalar()
if fallback and not res:
# fallback to additional keys
@@ -719,150 +719,150 @@ class UserEmailMap(Base, BaseDbModel):
def email(self, val):
self._email = val.lower() if val else None
class UserIpMap(Base, BaseDbModel):
__tablename__ = 'user_ip_map'
__table_args__ = (
UniqueConstraint('user_id', 'ip_addr'),
_table_args_default_dict,
)
ip_id = Column(Integer(), primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
ip_addr = Column(String(255), nullable=False)
active = Column(Boolean(), nullable=False, default=True)
user = relationship('User')
def _get_ip_range(cls, ip_addr):
net = ipaddr.IPNetwork(address=ip_addr)
return [str(net.network), str(net.broadcast)]
def __json__(self):
return dict(
ip_addr=self.ip_addr,
ip_range=self._get_ip_range(self.ip_addr)
return "<%s %s: %s>" % (self.__class__.__name__, self.user_id, self.ip_addr)
class UserLog(Base, BaseDbModel):
__tablename__ = 'user_logs'
user_log_id = Column(Integer(), primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
username = Column(String(255), nullable=False)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column(Unicode(255), nullable=False)
user_ip = Column(String(255), nullable=True)
action = Column(UnicodeText(), nullable=False)
action_date = Column(DateTime(timezone=False), nullable=False)
return "<%s %r: %r')>" % (self.__class__.__name__,
return "<%s %r: %r>" % (self.__class__.__name__,
self.repository_name,
self.action)
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
repository = relationship('Repository', cascade='')
class UserGroup(Base, BaseDbModel):
__tablename__ = 'users_groups'
users_group_id = Column(Integer(), primary_key=True)
users_group_name = Column(Unicode(255), nullable=False, unique=True)
user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
users_group_active = Column(Boolean(), nullable=False)
owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_group_data = Column("group_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
members = relationship('UserGroupMember', cascade="all, delete-orphan")
users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
user_user_group_to_perm = relationship('UserUserGroupToPerm', cascade='all')
user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
owner = relationship('User')
def group_data(self):
if not self._group_data:
return ext_json.loads(self._group_data)
@group_data.setter
def group_data(self, val):
self._group_data = ascii_bytes(ext_json.dumps(val))
return "<%s %s: %r')>" % (self.__class__.__name__,
return "<%s %s: %r>" % (self.__class__.__name__,
self.users_group_id,
self.users_group_name)
return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
def get_by_group_name(cls, group_name, case_insensitive=False):
q = cls.query().filter(sqlalchemy.func.lower(cls.users_group_name) == sqlalchemy.func.lower(group_name))
q = cls.query().filter(cls.users_group_name == group_name)
def get(cls, user_group_id):
user_group = cls.query()
return user_group.get(user_group_id)
def get_api_data(self, with_members=True):
user_group = self
data = dict(
users_group_id=user_group.users_group_id,
group_name=user_group.users_group_name,
group_description=user_group.user_group_description,
active=user_group.users_group_active,
owner=user_group.owner.username,
if with_members:
data['members'] = [
ugm.user.get_api_data()
for ugm in user_group.members
]
return data
class UserGroupMember(Base, BaseDbModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column(Integer(), primary_key=True)
users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
Status change: