@@ -143,1062 +143,1062 @@ class BaseDbModel(object):
if callback is not None:
return callback(value)
raise Exception(
'given object must be int, long or Instance of %s '
'got %s, no callback provided' % (cls, type(value))
)
@classmethod
def get_or_404(cls, id_):
try:
id_ = int(id_)
except (TypeError, ValueError):
raise HTTPNotFound
res = cls.query().get(id_)
if res is None:
return res
def delete(cls, id_):
obj = cls.query().get(id_)
Session().delete(obj)
def __repr__(self):
return '<DB:%s>' % (self.__class__.__name__)
_table_args_default_dict = {'extend_existing': True,
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8',
'sqlite_autoincrement': True,
}
class Setting(Base, BaseDbModel):
__tablename__ = 'settings'
__table_args__ = (
_table_args_default_dict,
SETTINGS_TYPES = {
'str': safe_bytes,
'int': safe_int,
'unicode': safe_str,
'bool': str2bool,
'list': functools.partial(aslist, sep=',')
DEFAULT_UPDATE_URL = ''
app_settings_id = Column(Integer(), primary_key=True)
app_settings_name = Column(String(255), nullable=False, unique=True)
_app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
_app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
def __init__(self, key='', val='', type='unicode'):
self.app_settings_name = key
self.app_settings_value = val
self.app_settings_type = type
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert isinstance(val, str)
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
_type = self.app_settings_type
converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
return converter(v)
@app_settings_value.setter
def app_settings_value(self, val):
"""
Setter that will always make sure we use str in app_settings_value
self._app_settings_value = safe_str(val)
def app_settings_type(self):
return self._app_settings_type
@app_settings_type.setter
def app_settings_type(self, val):
if val not in self.SETTINGS_TYPES:
raise Exception('type must be one of %s got %s'
% (list(self.SETTINGS_TYPES), val))
self._app_settings_type = val
return "<%s %s.%s=%r>" % (
self.__class__.__name__,
self.app_settings_name, self.app_settings_type, self.app_settings_value
def get_by_name(cls, key):
return cls.query() \
.filter(cls.app_settings_name == key).scalar()
def get_by_name_or_create(cls, key, val='', type='unicode'):
res = cls.get_by_name(key)
res = cls(key, val, type)
def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
Creates or updates Kallithea setting. If updates are triggered, it will only
update parameters that are explicitly set. Optional instance will be skipped.
:param key:
:param val:
:param type:
:return:
val = Optional.extract(val)
type = Optional.extract(type)
Session().add(res)
else:
res.app_settings_name = key
if not isinstance(val, Optional):
# update if set
res.app_settings_value = val
if not isinstance(type, Optional):
res.app_settings_type = type
def get_app_settings(cls):
ret = cls.query()
if ret is None:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings[each.app_settings_name] = \
each.app_settings_value
return settings
def get_auth_settings(cls):
ret = cls.query() \
.filter(cls.app_settings_name.startswith('auth_')).all()
fd = {}
for row in ret:
fd[row.app_settings_name] = row.app_settings_value
return fd
def get_default_repo_settings(cls, strip_prefix=False):
.filter(cls.app_settings_name.startswith('default_')).all()
key = row.app_settings_name
if strip_prefix:
key = remove_prefix(key, prefix='default_')
fd.update({key: row.app_settings_value})
def get_server_info(cls):
import pkg_resources
import platform
from kallithea.lib.utils import check_git_version
mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
info = {
'modules': sorted(mods, key=lambda k: k[0].lower()),
'py_version': platform.python_version(),
'platform': platform.platform(),
'kallithea_version': kallithea.__version__,
'git_version': str(check_git_version()),
'git_path': kallithea.CONFIG.get('git_path')
return info
class Ui(Base, BaseDbModel):
__tablename__ = 'ui'
Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
UniqueConstraint('ui_section', 'ui_key'),
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
ui_id = Column(Integer(), primary_key=True)
ui_section = Column(String(255), nullable=False)
ui_key = Column(String(255), nullable=False)
ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
ui_active = Column(Boolean(), nullable=False, default=True)
def get_by_key(cls, section, key):
""" Return specified Ui object, or None if not found. """
return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
def get_or_create(cls, section, key):
""" Return specified Ui object, creating it if necessary. """
setting = cls.get_by_key(section, key)
if setting is None:
setting = cls(ui_section=section, ui_key=key)
Session().add(setting)
return setting
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
q = q.filter(cls.ui_section == 'hooks')
q = q.order_by(cls.ui_section, cls.ui_key)
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
def get_repos_location(cls):
return cls.get_by_key('paths', '/').ui_value
def create_or_update_hook(cls, key, val):
new_ui = cls.get_or_create('hooks', key)
new_ui.ui_active = True
new_ui.ui_value = val
return '<%s %s.%s=%r>' % (
self.ui_section, self.ui_key, self.ui_value)
class User(Base, BaseDbModel):
__tablename__ = 'users'
Index('u_username_idx', 'username'),
Index('u_email_idx', 'email'),
DEFAULT_USER = 'default'
DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
# The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
DEFAULT_AUTH_TYPE = 'internal'
user_id = Column(Integer(), primary_key=True)
username = Column(String(255), nullable=False, unique=True)
password = Column(String(255), nullable=False)
active = Column(Boolean(), nullable=False, default=True)
admin = Column(Boolean(), nullable=False, default=False)
name = Column("firstname", Unicode(255), nullable=False)
lastname = Column(Unicode(255), nullable=False)
_email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
last_login = Column(DateTime(timezone=False), nullable=True)
extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
api_key = Column(String(255), nullable=False)
created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_user_data = Column("user_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
user_log = relationship('UserLog')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
repo_groups = relationship('RepoGroup')
user_groups = relationship('UserGroup')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UserGroupMember', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
# extra emails for this user
user_emails = relationship('UserEmailMap', cascade='all')
# extra API keys
user_api_keys = relationship('UserApiKeys', cascade='all')
ssh_keys = relationship('UserSshKeys', cascade='all')
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
return [self.email] + [x.email for x in other]
def api_keys(self):
other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
return [self.api_key] + [x.api_key for x in other]
def ip_addresses(self):
ret = UserIpMap.query().filter(UserIpMap.user == self).all()
return [x.ip_addr for x in ret]
def full_name(self):
return '%s %s' % (self.firstname, self.lastname)
def full_name_or_username(self):
Show full name.
If full name is not set, fall back to username.
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
def full_name_and_username(self):
Show full name and username as 'Firstname Lastname (username)'.
return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
def full_contact(self):
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
def is_admin(self):
return self.admin
def is_default_user(self):
return self.username == User.DEFAULT_USER
def user_data(self):
if not self._user_data:
return {}
return ext_json.loads(self._user_data)
except TypeError:
@user_data.setter
def user_data(self, val):
self._user_data = ascii_bytes(ext_json.dumps(val))
except Exception:
log.error(traceback.format_exc())
return "<%s %s: %r')>" % (self.__class__.__name__, self.user_id, self.username)
return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
def guess_instance(cls, value):
return super(User, cls).guess_instance(value, User.get_by_username)
def get_or_404(cls, id_, allow_default=True):
'''
Overridden version of BaseDbModel.get_or_404, with an extra check on
the default user.
user = super(User, cls).get_or_404(id_)
if not allow_default and user.is_default_user:
raise DefaultUserException()
return user
def get_by_username_or_email(cls, username_or_email, case_insensitive=True):
For anything that looks like an email address, look up by the email address (matching
case insensitively).
For anything else, try to look up by the user name.
This assumes no normal username can have '@' symbol.
if '@' in username_or_email:
return User.get_by_email(username_or_email)
return User.get_by_username(username_or_email, case_insensitive=case_insensitive)
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
q = cls.query().filter(cls.username == username)
return q.scalar()
def get_by_api_key(cls, api_key, fallback=True):
if len(api_key) != 40 or not api_key.isalnum():
return None
q = cls.query().filter(cls.api_key == api_key)
res = q.scalar()
if fallback and not res:
# fallback to additional keys
_res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
if _res:
res = _res.user
if res is None or not res.active or res.is_default_user:
def get_by_email(cls, email, cache=False):
q = cls.query().filter(sqlalchemy.func.lower(cls.email) == sqlalchemy.func.lower(email))
ret = q.scalar()
q = UserEmailMap.query()
# try fetching in alternate email map
q = q.filter(sqlalchemy.func.lower(UserEmailMap.email) == sqlalchemy.func.lower(email))
q = q.options(joinedload(UserEmailMap.user))
ret = getattr(q.scalar(), 'user', None)
return ret
def get_from_cs_author(cls, author):
Tries to get User objects out of commit author string
:param author:
from kallithea.lib.helpers import email, author_name
# Valid email in the attribute passed, see if they're in the system
_email = email(author)
if _email:
user = cls.get_by_email(_email)
if user is not None:
# Maybe we can match by username?
_author = author_name(author)
user = cls.get_by_username(_author, case_insensitive=True)
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
log.debug('updated user %s lastlogin', self.username)
def get_first_admin(cls):
user = User.query().filter(User.admin == True).first()
if user is None:
raise Exception('Missing administrative account!')
def get_default_user(cls):
user = User.get_by_username(User.DEFAULT_USER)
raise Exception('Missing default account!')
def get_api_data(self, details=False):
Common function for generating user related data for API
user = self
data = dict(
user_id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
emails=user.emails,
active=user.active,
admin=user.admin,
if details:
data.update(dict(
extern_type=user.extern_type,
extern_name=user.extern_name,
api_key=user.api_key,
api_keys=user.api_keys,
last_login=user.last_login,
ip_addresses=user.ip_addresses
))
return data
def __json__(self):
full_name=self.full_name,
full_name_or_username=self.full_name_or_username,
short_contact=self.short_contact,
full_contact=self.full_contact
data.update(self.get_api_data())
class UserApiKeys(Base, BaseDbModel):
__tablename__ = 'user_api_keys'
Index('uak_api_key_idx', 'api_key'),
Index('uak_api_key_expires_idx', 'api_key', 'expires'),
user_api_key_id = Column(Integer(), primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
api_key = Column(String(255), nullable=False, unique=True)
description = Column(UnicodeText(), nullable=False)
expires = Column(Float(53), nullable=False)
user = relationship('User')
def is_expired(self):
return (self.expires != -1) & (time.time() > self.expires)
class UserEmailMap(Base, BaseDbModel):
__tablename__ = 'user_email_map'
Index('uem_email_idx', 'email'),
email_id = Column(Integer(), primary_key=True)
_email = Column("email", String(255), nullable=False, unique=True)
@validates('_email')
def validate_email(self, key, email):
# check if this email is not main one
main_email = Session().query(User).filter(User.email == email).scalar()
if main_email is not None:
raise AttributeError('email %s is present is user table' % email)
return email
class UserIpMap(Base, BaseDbModel):
__tablename__ = 'user_ip_map'
UniqueConstraint('user_id', 'ip_addr'),
ip_id = Column(Integer(), primary_key=True)
ip_addr = Column(String(255), nullable=False)
def _get_ip_range(cls, ip_addr):
net = ipaddr.IPNetwork(address=ip_addr)
return [str(net.network), str(net.broadcast)]
return dict(
ip_addr=self.ip_addr,
ip_range=self._get_ip_range(self.ip_addr)
return "<%s %s: %s>" % (self.__class__.__name__, self.user_id, self.ip_addr)
class UserLog(Base, BaseDbModel):
__tablename__ = 'user_logs'
user_log_id = Column(Integer(), primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
username = Column(String(255), nullable=False)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column(Unicode(255), nullable=False)
user_ip = Column(String(255), nullable=True)
action = Column(UnicodeText(), nullable=False)
action_date = Column(DateTime(timezone=False), nullable=False)
return "<%s %r: %r')>" % (self.__class__.__name__,
return "<%s %r: %r>" % (self.__class__.__name__,
self.repository_name,
self.action)
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
repository = relationship('Repository', cascade='')
class UserGroup(Base, BaseDbModel):
__tablename__ = 'users_groups'
users_group_id = Column(Integer(), primary_key=True)
users_group_name = Column(Unicode(255), nullable=False, unique=True)
user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
users_group_active = Column(Boolean(), nullable=False)
owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
_group_data = Column("group_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
members = relationship('UserGroupMember', cascade="all, delete-orphan")
users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
user_user_group_to_perm = relationship('UserUserGroupToPerm', cascade='all')
user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
owner = relationship('User')
def group_data(self):
if not self._group_data:
return ext_json.loads(self._group_data)
@group_data.setter
def group_data(self, val):
self._group_data = ascii_bytes(ext_json.dumps(val))
return "<%s %s: %r')>" % (self.__class__.__name__,
return "<%s %s: %r>" % (self.__class__.__name__,
self.users_group_id,
self.users_group_name)
return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
def get_by_group_name(cls, group_name, case_insensitive=False):
q = cls.query().filter(sqlalchemy.func.lower(cls.users_group_name) == sqlalchemy.func.lower(group_name))
q = cls.query().filter(cls.users_group_name == group_name)
def get(cls, user_group_id):
user_group = cls.query()
return user_group.get(user_group_id)
def get_api_data(self, with_members=True):
user_group = self
users_group_id=user_group.users_group_id,
group_name=user_group.users_group_name,
group_description=user_group.user_group_description,
active=user_group.users_group_active,
owner=user_group.owner.username,
if with_members:
data['members'] = [
ugm.user.get_api_data()
for ugm in user_group.members
]
class UserGroupMember(Base, BaseDbModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column(Integer(), primary_key=True)
users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
users_group = relationship('UserGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class RepositoryField(Base, BaseDbModel):
__tablename__ = 'repositories_fields'
UniqueConstraint('repository_id', 'field_key'), # no-multi field
PREFIX = 'ex_' # prefix used in form to not conflict with already existing fields
repo_field_id = Column(Integer(), primary_key=True)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
field_key = Column(String(250), nullable=False)
field_label = Column(String(1024), nullable=False)
field_value = Column(String(10000), nullable=False)
field_desc = Column(String(1024), nullable=False)
field_type = Column(String(255), nullable=False)
repository = relationship('Repository')
def field_key_prefixed(self):
return 'ex_%s' % self.field_key
def un_prefix_key(cls, key):
if key.startswith(cls.PREFIX):
return key[len(cls.PREFIX):]
return key
def get_by_key_name(cls, key, repo):
row = cls.query() \
.filter(cls.repository == repo) \
.filter(cls.field_key == key).scalar()
return row
class Repository(Base, BaseDbModel):
__tablename__ = 'repositories'
Index('r_repo_name_idx', 'repo_name'),
DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
DEFAULT_CLONE_SSH = 'ssh://{system_user}@{hostname}/{repo}'
STATE_CREATED = 'repo_state_created'
STATE_PENDING = 'repo_state_pending'
STATE_ERROR = 'repo_state_error'
repo_id = Column(Integer(), primary_key=True)
repo_name = Column(Unicode(255), nullable=False, unique=True)
repo_state = Column(String(255), nullable=False)
clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
repo_type = Column(String(255), nullable=False) # 'hg' or 'git'
private = Column(Boolean(), nullable=False)
enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
description = Column(Unicode(10000), nullable=False)
updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_landing_revision = Column("landing_revision", String(255), nullable=False)
_changeset_cache = Column("changeset_cache", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
fork_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('RepoGroup')
repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing',
primaryjoin='UserFollowing.follows_repository_id==Repository.repo_id',
cascade='all')
extra_fields = relationship('RepositoryField',
cascade="all, delete-orphan")
logs = relationship('UserLog')
comments = relationship('ChangesetComment', cascade="all, delete-orphan")
pull_requests_org = relationship('PullRequest',
primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
pull_requests_other = relationship('PullRequest',
primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
self.repo_id, self.repo_name)
def landing_rev(self):
# always should return [rev_type, rev]
if self._landing_revision:
_rev_info = self._landing_revision.split(':')
if len(_rev_info) < 2:
_rev_info.insert(0, 'rev')
return [_rev_info[0], _rev_info[1]]
return [None, None]
@landing_rev.setter
def landing_rev(self, val):
if ':' not in val:
raise ValueError('value must be delimited with `:` and consist '
'of <rev_type>:<rev>, got %s instead' % val)
self._landing_revision = val
def changeset_cache(self):
cs_cache = ext_json.loads(self._changeset_cache) # might raise on bad data
cs_cache['raw_id'] # verify data, raise exception on error
return cs_cache
except (TypeError, KeyError, ValueError):
return EmptyChangeset().__json__()
@changeset_cache.setter
def changeset_cache(self, val):
self._changeset_cache = ascii_bytes(ext_json.dumps(val))
def query(cls, sorted=False):
"""Add Repository-specific helpers for common query constructs.
sorted: if True, apply the default ordering (name, case insensitive).
q = super(Repository, cls).query()
if sorted:
q = q.order_by(sqlalchemy.func.lower(Repository.repo_name))
return q
def normalize_repo_name(cls, repo_name):
Normalizes os specific repo_name to the format internally stored inside
database using URL_SEP
:param cls:
:param repo_name:
return URL_SEP.join(repo_name.split(os.sep))
return super(Repository, cls).guess_instance(value, Repository.get_by_repo_name)
def get_by_repo_name(cls, repo_name, case_insensitive=False):
"""Get the repo, defaulting to database case sensitivity.
case_insensitive will be slower and should only be specified if necessary."""
q = Session().query(cls).filter(sqlalchemy.func.lower(cls.repo_name) == sqlalchemy.func.lower(repo_name))
q = Session().query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork)) \
.options(joinedload(Repository.owner)) \
.options(joinedload(Repository.group))
def get_by_full_path(cls, repo_full_path):
base_full_path = os.path.realpath(kallithea.CONFIG['base_path'])
repo_full_path = os.path.realpath(repo_full_path)
assert repo_full_path.startswith(base_full_path + os.path.sep)
repo_name = repo_full_path[len(base_full_path) + 1:]
repo_name = cls.normalize_repo_name(repo_name)
return cls.get_by_repo_name(repo_name.strip(URL_SEP))
def get_repo_forks(cls, repo_id):
return cls.query().filter(Repository.fork_id == repo_id)
def forks(self):
Return forks of this repo
return Repository.get_repo_forks(self.repo_id)
def parent(self):
Returns fork parent
return self.fork
def just_name(self):
return self.repo_name.split(URL_SEP)[-1]
def groups_with_parents(self):
groups = []
group = self.group
while group is not None:
groups.append(group)
group = group.parent_group
assert group not in groups, group # avoid recursion on bad db content
groups.reverse()
return groups
def repo_full_path(self):
Returns base full path for the repository - where it actually
exists on a filesystem.
p = [kallithea.CONFIG['base_path']]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split(URL_SEP)
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return URL_SEP.join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from kallithea.lib.utils import make_ui
return make_ui()
def is_valid(cls, repo_name):
returns True if given repo name is a valid filesystem repository
from kallithea.lib.utils import is_valid_repo
return is_valid_repo(repo_name, kallithea.CONFIG['base_path'])
def get_api_data(self, with_revision_names=False,
with_pullrequests=False):
Common function for generating repo api data.
Optionally, also return tags, branches, bookmarks and PRs.
repo = self
repo_id=repo.repo_id,
repo_name=repo.repo_name,
repo_type=repo.repo_type,
clone_uri=repo.clone_uri,
private=repo.private,
created_on=repo.created_on,
description=repo.description,
landing_rev=repo.landing_rev,
owner=repo.owner.username,
fork_of=repo.fork.repo_name if repo.fork else None,
enable_statistics=repo.enable_statistics,
enable_downloads=repo.enable_downloads,
last_changeset=repo.changeset_cache,
if with_revision_names:
scm_repo = repo.scm_instance_no_cache()
tags=scm_repo.tags,
branches=scm_repo.branches,
bookmarks=scm_repo.bookmarks,
if with_pullrequests:
data['pull_requests'] = repo.pull_requests_other
rc_config = Setting.get_app_settings()
repository_fields = str2bool(rc_config.get('repository_fields'))
if repository_fields:
for f in self.extra_fields:
data[f.field_key_prefixed] = f.field_value
def last_db_change(self):
return self.updated_on
def clone_uri_hidden(self):
clone_uri = self.clone_uri
if clone_uri:
import urlobject
url_obj = urlobject.URLObject(self.clone_uri)
if url_obj.password:
clone_uri = url_obj.with_password('*****')
return clone_uri
def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
elif with_id:
clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
import kallithea.lib.helpers as h
prefix_url = h.canonical_url('home')
return get_clone_url(clone_uri_tmpl=clone_uri_tmpl,
prefix_url=prefix_url,
repo_name=self.repo_name,
repo_id=self.repo_id,
username=username)
Status change: