Changeset - 58b6e4cd6fe9
[Not reviewed]
default
0 15 0
Mads Kiilerich - 6 years ago 2019-12-28 13:38:22
mads@kiilerich.com
Grafted from: d6c192808c3d
lib: clean up ext_json and how it is used - avoid monkey patching

Note that py3 json.dumps will return ASCII (with all unicode escaped) encoded
as str. But we generally want JSON as bytes (which json.loads also can read),
so also wrap the result with ascii_bytes in many places.
15 files changed with 94 insertions and 95 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/base.py
Show inline comments
 
@@ -28,13 +28,14 @@ Original author and date, and relevant c
 
import os
 
import pprint
 
import random
 
import sys
 
import urllib2
 

	
 
from kallithea.lib.compat import json
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import ascii_bytes
 

	
 

	
 
CONFIG_NAME = '.config/kallithea'
 
FORMAT_PRETTY = 'pretty'
 
FORMAT_JSON = 'json'
 

	
 
@@ -65,17 +66,17 @@ def api_call(apikey, apihost, method=Non
 

	
 
    if not method:
 
        raise Exception('please specify method name !')
 
    apihost = apihost.rstrip('/')
 
    id_ = random.randrange(1, 9999)
 
    req = urllib2.Request('%s/_admin/api' % apihost,
 
                      data=json.dumps(_build_data(id_)),
 
                      data=ascii_bytes(ext_json.dumps(_build_data(id_))),
 
                      headers={'content-type': 'text/plain'})
 
    ret = urllib2.urlopen(req)
 
    raw_json = ret.read()
 
    json_data = json.loads(raw_json)
 
    json_data = ext_json.loads(raw_json)
 
    id_ret = json_data['id']
 
    if id_ret == id_:
 
        return json_data
 

	
 
    else:
 
        _formatted_json = pprint.pformat(json_data)
 
@@ -125,13 +126,13 @@ class RcConf(object):
 
        :param config:
 
        """
 
        update = False
 
        if os.path.exists(self._conf_name):
 
            update = True
 
        with open(self._conf_name, 'wb') as f:
 
            json.dump(config, f, indent=4)
 
            ext_json.dump(config, f, indent=4)
 
            f.write('\n')
 

	
 
        if update:
 
            sys.stdout.write('Updated config in %s\n' % self._conf_name)
 
        else:
 
            sys.stdout.write('Created new config in %s\n' % self._conf_name)
 
@@ -143,23 +144,23 @@ class RcConf(object):
 

	
 
        :param new_config:
 
        """
 
        config = {}
 
        try:
 
            with open(self._conf_name, 'rb') as conf:
 
                config = json.load(conf)
 
                config = ext_json.load(conf)
 
        except IOError as e:
 
            sys.stderr.write(str(e) + '\n')
 

	
 
        config.update(new_config)
 
        self.make_config(config)
 

	
 
    def load_config(self):
 
        """
 
        Loads config from file and returns loaded JSON object
 
        """
 
        try:
 
            with open(self._conf_name, 'rb') as conf:
 
                return json.load(conf)
 
                return ext_json.load(conf)
 
        except IOError as e:
 
            #sys.stderr.write(str(e) + '\n')
 
            pass
kallithea/bin/kallithea_api.py
Show inline comments
 
@@ -25,15 +25,16 @@ Original author and date, and relevant c
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import argparse
 
import json
 
import sys
 

	
 
from kallithea.bin.base import FORMAT_JSON, FORMAT_PRETTY, RcConf, api_call, json
 
from kallithea.bin.base import FORMAT_JSON, FORMAT_PRETTY, RcConf, api_call
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-api [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] "
kallithea/bin/kallithea_gist.py
Show inline comments
 
@@ -26,17 +26,18 @@ Original author and date, and relevant c
 
"""
 

	
 
from __future__ import print_function
 

	
 
import argparse
 
import fileinput
 
import json
 
import os
 
import stat
 
import sys
 

	
 
from kallithea.bin.base import FORMAT_JSON, FORMAT_PRETTY, RcConf, api_call, json
 
from kallithea.bin.base import FORMAT_JSON, FORMAT_PRETTY, RcConf, api_call
 

	
 

	
 
def argparser(argv):
 
    usage = (
 
      "kallithea-gist [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
 
      "[--config=CONFIG] [--save-config] [GIST OPTIONS] "
kallithea/bin/ldap_sync.py
Show inline comments
 
@@ -30,13 +30,14 @@ from __future__ import print_function
 
import urllib2
 
import uuid
 
from ConfigParser import ConfigParser
 

	
 
import ldap
 

	
 
from kallithea.lib.compat import json
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import ascii_bytes
 

	
 

	
 
config = ConfigParser()
 
config.read('ldap_sync.conf')
 

	
 

	
 
@@ -77,18 +78,18 @@ class API(object):
 
        This will generate the UUID for validation check after the
 
        response is returned. Handle errors and get the result back.
 
        """
 
        uid = str(uuid.uuid1())
 
        data = self.get_api_data(uid, method, args)
 

	
 
        data = json.dumps(data)
 
        data = ascii_bytes(ext_json.dumps(data))
 
        headers = {'content-type': 'text/plain'}
 
        req = urllib2.Request(self.url, data, headers)
 

	
 
        response = urllib2.urlopen(req)
 
        response = json.load(response)
 
        response = ext_json.load(response)
 

	
 
        if uid != response["id"]:
 
            raise InvalidResponseIDError("UUID does not match.")
 

	
 
        if response["error"] is not None:
 
            raise ResponseError(response["error"])
kallithea/controllers/api/__init__.py
Show inline comments
 
@@ -32,17 +32,17 @@ import time
 
import traceback
 
import types
 

	
 
from tg import Response, TGController, request, response
 
from webob.exc import HTTPError, HTTPException
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.base import _get_ip_addr as _get_ip
 
from kallithea.lib.base import get_path_info
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.utils2 import ascii_bytes, safe_str
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger('JSONRPC')
 

	
 

	
 
@@ -118,13 +118,13 @@ class JSONRPCController(TGController):
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="Content-Length is 0")
 

	
 
        raw_body = environ['wsgi.input'].read(length)
 

	
 
        try:
 
            json_body = json.loads(raw_body)
 
            json_body = ext_json.loads(raw_body)
 
        except ValueError as e:
 
            # catch JSON errors Here
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="JSON parse error ERR:%s RAW:%r"
 
                                                % (e, raw_body))
 

	
 
@@ -235,22 +235,22 @@ class JSONRPCController(TGController):
 

	
 
        if self._error is not None:
 
            raw_response = None
 

	
 
        response = dict(id=self._req_id, result=raw_response, error=self._error)
 
        try:
 
            return json.dumps(response)
 
            return ascii_bytes(ext_json.dumps(response))
 
        except TypeError as e:
 
            log.error('API FAILED. Error encoding response for %s %s: %s\n%s', action, rpc_args, e, traceback.format_exc())
 
            return json.dumps(
 
            return ascii_bytes(ext_json.dumps(
 
                dict(
 
                    id=self._req_id,
 
                    result=None,
 
                    error="Error encoding response"
 
                    error="Error encoding response",
 
                )
 
            )
 
            ))
 

	
 
    def _find_method(self):
 
        """
 
        Return method named by `self._req_method` in controller if able
 
        """
 
        log.debug('Trying to find JSON-RPC method: %s', self._req_method)
kallithea/controllers/summary.py
Show inline comments
 
@@ -37,16 +37,16 @@ from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.conf import ALL_EXTS, ALL_READMES, LANGUAGES_EXTENSIONS_MAP
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.celerylib.tasks import get_commits_stats
 
from kallithea.lib.compat import json
 
from kallithea.lib.markup_renderer import MarkupRenderer
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int, safe_unicode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetError, EmptyRepositoryError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
@@ -136,13 +136,13 @@ class SummaryController(BaseRepoControll
 
            .scalar()
 

	
 
        c.stats_percentage = 0
 

	
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 
            lang_stats_d = ext_json.loads(stats.languages)
 

	
 
            lang_stats = [(x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x, '?')})
 
                          for x, y in lang_stats_d.items()]
 
            lang_stats.sort(key=lambda k: (-k[1]['count'], k[0]))
 

	
 
@@ -188,15 +188,15 @@ class SummaryController(BaseRepoControll
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 
        c.stats_percentage = 0
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 
            c.commit_data = json.loads(stats.commit_activity)
 
            c.overview_data = json.loads(stats.commit_activity_combined)
 
            lang_stats_d = ext_json.loads(stats.languages)
 
            c.commit_data = ext_json.loads(stats.commit_activity)
 
            c.overview_data = ext_json.loads(stats.commit_activity_combined)
 

	
 
            lang_stats = ((x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x)})
 
                          for x, y in lang_stats_d.items())
 

	
 
            c.trending_languages = (
kallithea/lib/auth_modules/auth_crowd.py
Show inline comments
 
@@ -27,14 +27,15 @@ Original author and date, and relevant c
 

	
 

	
 
import base64
 
import logging
 
import urllib2
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property, json
 
from kallithea.lib import auth_modules, ext_json
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.lib.utils2 import ascii_bytes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class CrowdServer(object):
 
@@ -100,13 +101,13 @@ class CrowdServer(object):
 
            msg = "".join(rdoc.readlines())
 
            if not msg and empty_response_ok:
 
                rval = {}
 
                rval["status"] = True
 
                rval["error"] = "Response body was empty"
 
            elif not noformat:
 
                rval = json.loads(msg)
 
                rval = ext_json.loads(msg)
 
                rval["status"] = True
 
            else:
 
                rval = "".join(rdoc.readlines())
 
        except Exception as e:
 
            if not noformat:
 
                rval = {"status": False,
 
@@ -118,13 +119,13 @@ class CrowdServer(object):
 

	
 
    def user_auth(self, username, password):
 
        """Authenticate a user against crowd. Returns brief information about
 
        the user."""
 
        url = ("%s/rest/usermanagement/%s/authentication?username=%s"
 
               % (self._uri, self._version, urllib2.quote(username)))
 
        body = json.dumps({"value": password})
 
        body = ascii_bytes(ext_json.dumps({"value": password}))
 
        return self._request(url, body)
 

	
 
    def user_groups(self, username):
 
        """Retrieve a list of groups to which this user belongs."""
 
        url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s"
 
               % (self._uri, self._version, urllib2.quote(username)))
kallithea/lib/base.py
Show inline comments
 
@@ -42,18 +42,17 @@ import webob.exc
 
from tg import TGController, config, render_template, request, response, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
from kallithea import BACKENDS, __version__
 
from kallithea.config.routing import url
 
from kallithea.lib import auth_modules
 
from kallithea.lib import auth_modules, ext_json
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyMiddleware
 
from kallithea.lib.compat import json
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import get_repo_slug, is_valid_repo
 
from kallithea.lib.utils2 import AttributeDict, safe_int, safe_str, safe_unicode, set_hook_environment, str2bool
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, safe_str, safe_unicode, set_hook_environment, str2bool
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError
 
from kallithea.model import meta
 
from kallithea.model.db import PullRequest, Repository, Setting, User
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
@@ -627,13 +626,13 @@ def jsonify(func, *args, **kwargs):
 
        msg = "JSON responses with Array envelopes are susceptible to " \
 
              "cross-site data leak attacks, see " \
 
              "https://web.archive.org/web/20120519231904/http://wiki.pylonshq.com/display/pylonsfaq/Warnings"
 
        warnings.warn(msg, Warning, 2)
 
        log.warning(msg)
 
    log.debug("Returning JSON wrapped action output")
 
    return json.dumps(data)
 
    return ascii_bytes(ext_json.dumps(data))
 

	
 
@decorator.decorator
 
def IfSshEnabled(func, *args, **kwargs):
 
    """Decorator for functions that can only be called if SSH access is enabled.
 

	
 
    If SSH access is disabled in the configuration file, HTTPNotFound is raised.
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -34,19 +34,18 @@ from collections import OrderedDict
 
from operator import itemgetter
 
from time import mktime
 

	
 
from tg import config
 

	
 
from kallithea import CELERY_ON
 
from kallithea.lib import celerylib
 
from kallithea.lib.compat import json
 
from kallithea.lib import celerylib, ext_json
 
from kallithea.lib.helpers import person
 
from kallithea.lib.hooks import log_create_repository
 
from kallithea.lib.rcmail.smtp_mailer import SmtpMailer
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.utils2 import ascii_bytes, str2bool
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.model.db import RepoGroup, Repository, Statistics, User
 

	
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 

	
 
@@ -115,15 +114,15 @@ def get_commits_stats(repo_name, ts_min_
 
            # current state of parsing revision(from db marker) is the
 
            # last revision
 
            lock.release()
 
            return True
 

	
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(json.loads(
 
            commits_by_day_aggregate = OrderedDict(ext_json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 
            co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
 

	
 
        log.debug('starting parsing %s', parse_limit)
 

	
 
        last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
 
        log.debug('Getting revisions from %s to %s',
 
             last_rev, last_rev + parse_limit
 
@@ -190,22 +189,22 @@ def get_commits_stats(repo_name, ts_min_
 
                "label": akc(repo.contact),
 
                "data": [0, 1],
 
                "schema": ["commits"],
 
            }
 

	
 
        stats = cur_stats if cur_stats else Statistics()
 
        stats.commit_activity = json.dumps(co_day_auth_aggr)
 
        stats.commit_activity_combined = json.dumps(overview_data)
 
        stats.commit_activity = ascii_bytes(ext_json.dumps(co_day_auth_aggr))
 
        stats.commit_activity_combined = ascii_bytes(ext_json.dumps(overview_data))
 

	
 
        log.debug('last revision %s', last_rev)
 
        leftovers = len(repo.revisions[last_rev:])
 
        log.debug('revisions to parse %s', leftovers)
 

	
 
        if last_rev == 0 or leftovers < parse_limit:
 
            log.debug('getting code trending stats')
 
            stats.languages = json.dumps(__get_codes_stats(repo_name))
 
            stats.languages = ascii_bytes(ext_json.dumps(__get_codes_stats(repo_name)))
 

	
 
        try:
 
            stats.repository = dbrepo
 
            stats.stat_on_revision = last_cs.revision if last_cs else 0
 
            DBS.add(stats)
 
            DBS.commit()
kallithea/lib/compat.py
Show inline comments
 
@@ -39,21 +39,16 @@ from sqlalchemy.ext.hybrid import hybrid
 
#==============================================================================
 
from sqlalchemy.util import OrderedSet
 

	
 
#==============================================================================
 
# json
 
#==============================================================================
 
from kallithea.lib.ext_json import json
 
from kallithea.lib import ext_json
 

	
 

	
 
# alias for formatted json
 
formatted_json = functools.partial(json.dumps, indent=4, sort_keys=True)
 

	
 

	
 

	
 

	
 
formatted_json = functools.partial(ext_json.dumps, indent=4, sort_keys=True)
 

	
 

	
 
#==============================================================================
 
# kill
 
#==============================================================================
 
if os.name == 'nt': # Windows
kallithea/lib/ext_json.py
Show inline comments
 
"""
 
Extended JSON encoder for json
 
Extended JSON encoder with support for more data types
 

	
 
json.org does not specify how date time can be represented - monkeypatch it to do something.
 
json.org does not specify how date time can be represented - just encode it somehow and ignore decoding ...
 
"""
 

	
 
import datetime
 
import decimal
 
import functools
 
import json  # is re-exported after monkey patching
 
import json
 

	
 

	
 
__all__ = ['json']
 
__all__ = ['dumps', 'dump', 'load', 'loads']
 

	
 

	
 
def _is_tz_aware(value):
 
    """
 
    Determines if a given datetime.time is timezone aware.
 

	
 
@@ -67,13 +67,15 @@ def _obj_dump(obj):
 

	
 
class ExtendedEncoder(json.JSONEncoder):
 
    def default(self, obj):
 
        try:
 
            return _obj_dump(obj)
 
        except NotImplementedError:
 
            pass
 
            pass  # quiet skipping of unsupported types!
 
        raise TypeError("%r is not JSON serializable" % (obj,))
 

	
 

	
 
# monkey-patch and export JSON encoder to use custom encoding method
 
json.dumps = functools.partial(json.dumps, cls=ExtendedEncoder)
 
json.dump = functools.partial(json.dump, cls=ExtendedEncoder)
 
dumps = functools.partial(json.dumps, cls=ExtendedEncoder)
 
dump = functools.partial(json.dump, cls=ExtendedEncoder)
 
# No special support for loading these types back!!!
 
load = json.load
 
loads = json.loads
kallithea/lib/utils2.py
Show inline comments
 
@@ -28,24 +28,24 @@ Original author and date, and relevant c
 
"""
 

	
 
from __future__ import print_function
 

	
 
import binascii
 
import datetime
 
import json
 
import os
 
import pwd
 
import re
 
import time
 
import urllib
 

	
 
import urlobject
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
from kallithea.lib.compat import json
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str, safe_unicode  # re-export
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def str2bool(_str):
 
    """
kallithea/model/db.py
Show inline comments
 
@@ -42,17 +42,17 @@ from sqlalchemy import Boolean, Column, 
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.compat import json
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (
 
    Optional, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, safe_unicode, str2bool, urlreadable)
 
    Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, safe_unicode, str2bool, urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Base, Session
 

	
 
@@ -518,20 +518,20 @@ class User(Base, BaseDbModel):
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._user_data)
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
            self._user_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r')>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
@@ -830,20 +830,20 @@ class UserGroup(Base, BaseDbModel):
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
            return ext_json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
            self._group_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r')>" % (self.__class__.__name__,
 
                                  self.users_group_id,
 
@@ -1029,22 +1029,22 @@ class Repository(Base, BaseDbModel):
 
                             'of <rev_type>:<rev>, got %s instead' % val)
 
        self._landing_revision = val
 

	
 
    @hybrid_property
 
    def changeset_cache(self):
 
        try:
 
            cs_cache = json.loads(self._changeset_cache) # might raise on bad data
 
            cs_cache = ext_json.loads(self._changeset_cache) # might raise on bad data
 
            cs_cache['raw_id'] # verify data, raise exception on error
 
            return cs_cache
 
        except (TypeError, KeyError, ValueError):
 
            return EmptyChangeset().__json__()
 

	
 
    @changeset_cache.setter
 
    def changeset_cache(self, val):
 
        try:
 
            self._changeset_cache = json.dumps(val)
 
            self._changeset_cache = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add Repository-specific helpers for common query constructs.
kallithea/model/gist.py
Show inline comments
 
@@ -29,14 +29,14 @@ import logging
 
import os
 
import random
 
import shutil
 
import time
 
import traceback
 

	
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import AttributeDict, safe_int, safe_unicode, time_to_datetime
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, safe_unicode, time_to_datetime
 
from kallithea.model.db import Gist, Session, User
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 
@@ -79,13 +79,13 @@ class GistModel(object):
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(json.dumps(metadata))
 
            f.write(ascii_bytes(ext_json.dumps(metadata)))
 

	
 
    def get_gist(self, gist):
 
        return Gist.guess_instance(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -20,14 +20,15 @@ import os
 
import random
 
import re
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import ChangesetStatus, PullRequest, RepoGroup, Repository, Setting, Ui, User
 
from kallithea.model.gist import GistModel
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
@@ -45,25 +46,24 @@ TEST_REPO_GROUP = u'test_repo_group'
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    For convenience, the json is returned as str
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
    return random_id, ext_json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 
jsonify = lambda obj: ext_json.loads(ext_json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
@@ -124,22 +124,22 @@ class _BaseTestApi(object):
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        given = ext_json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        given = ext_json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
@@ -2319,22 +2319,21 @@ class _BaseTestApi(object):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
                                  lifetime=10,
 
                                  description='foobar-gist',
 
                                  gist_type='public',
 
                                  files={'foobar': {'content': 'foo'}})
 
        response = api_call(self, params)
 
        response_json = response.json
 
        expected = {
 
            'gist': {
 
                'access_id': response_json['result']['gist']['access_id'],
 
                'created_on': response_json['result']['gist']['created_on'],
 
                'access_id': response.json['result']['gist']['access_id'],
 
                'created_on': response.json['result']['gist']['created_on'],
 
                'description': 'foobar-gist',
 
                'expires': response_json['result']['gist']['expires'],
 
                'gist_id': response_json['result']['gist']['gist_id'],
 
                'expires': response.json['result']['gist']['expires'],
 
                'gist_id': response.json['result']['gist']['gist_id'],
 
                'type': 'public',
 
                'url': response_json['result']['gist']['url']
 
                'url': response.json['result']['gist']['url']
 
            },
 
            'msg': 'created new gist'
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'create', crash)
 
@@ -2394,64 +2393,64 @@ class _BaseTestApi(object):
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_changesets(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start=0, end=2)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_max_revisions(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 10
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_branch(self):
 
        if self.REPO == 'vcs_test_hg':
 
            branch = 'stable'
 
        else:
 
            pytest.skip("skipping due to missing branches in git test repo")
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00")
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 5
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_file_list(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' in result[0]
 

	
 
    def test_api_get_changeset(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        result = ext_json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" not in result
 

	
 
    def test_api_get_changeset_with_reviews(self):
 
        reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION,
 
                                  with_reviews=True)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        result = ext_json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" in result
 
        assert len(result["reviews"]) == 1
 
        review = result["reviews"][0]
 
        expected = {
 
            'status': 'approved',
 
@@ -2481,18 +2480,18 @@ class _BaseTestApi(object):
 
        expected = u'Access denied to repo %s' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u'get test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": 'get_pullrequest',
 
            "args": {"pullrequest_id": pull_request_id},
 
        })
 
        }))
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        expected = {
 
            "status": "new",
 
            "pull_request_id": pull_request_id,
 
            "description": "No description",
 
@@ -2512,57 +2511,57 @@ class _BaseTestApi(object):
 
                         given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
 
                                      b"2000-01-01T00:00:00", response.body))
 

	
 
    def test_api_close_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u'close test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "close_pr": True},
 
        })
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == ''
 
        assert pullrequest.status == PullRequest.STATUS_CLOSED
 
        assert pullrequest.is_closed() == True
 

	
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        }))
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
 
        assert ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 
        params = json.dumps({
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 

	
 
    def test_api_comment_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"comment test")
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
 
        })
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == u'Looks good to me'
0 comments (0 inline, 0 general)