diff --git a/rhodecode/model/license.py b/rhodecode/model/license.py new file mode 100644 --- /dev/null +++ b/rhodecode/model/license.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +rhodecode.model.license +~~~~~~~~~~~~~~~~~~~~~~~ + +Model for licenses + + +:created_on: Aug 1, 2013 +:author: marcink +:copyright: (c) 2013 RhodeCode GmbH. +:license: GPLv3, see LICENSE for more details. +""" + +import uuid +import base64 +import time +import logging +import traceback + +from Crypto import Random +from Crypto.Cipher import AES + +import rhodecode +from rhodecode.lib.compat import json +from rhodecode.model import BaseModel +from rhodecode.model.db import RhodeCodeSetting + +log = logging.getLogger(__name__) + +BLOCK_SIZE = 32 + + +def pad(s): + return (s + (BLOCK_SIZE - len(s) % BLOCK_SIZE) * + chr(BLOCK_SIZE - len(s) % BLOCK_SIZE)) + + +def unpad(s): + return s[0:-ord(s[-1])] + + +class LicenseModel(BaseModel): + + cls = RhodeCodeSetting + + def __init__(self, sa=None, key=None): + super(LicenseModel, self).__init__(sa=sa) + if not key: + key = rhodecode.CONFIG.get('license_token') + + if not key: + raise TypeError('Missing encryption key: %s' % key) + if not isinstance(key, basestring): + raise TypeError('Encryption key is bad type, got %s ' + 'expected basestring' % type(key)) + #strip dashes + self.key = key.replace('-', '') + + @classmethod + def generate_license_token(cls): + tok_len = 4 + hex_token = uuid.uuid1().hex + num_pools = [hex_token[start:start+tok_len] + for start in range(0, len(hex_token), tok_len)] + return u'-'.join(num_pools[:4]) + + @classmethod + def get_default_license(cls): + + key = 'abra-cada-bra1-rce3' + enc_license_key = (''' + 0HBgVSMJgapshyHBZGiSj/pqSLkO8BmkbSnFDw8j+e854FkSJuVyUhIyUSFgs5mJ+P + KGknkw/LOf0pKLRZ7EZiFW+lkrv99UaS395FCp4J+ymdg7YdIdYtc0FZ8TcZjXrV3y + ORR3Klsjf7PUV6XVYSU6CAMx5SyK78n3JwYLclCnog1lD4kwHEFJH+OD5jFG3OT9eN + Kd0c1rVIJbbkPfppVIa4twr52bXWjueOpamdFI7DOJtlse9XyY1axgGNVQDtgRlWi5 + YmtP8OfT+Rq9SaCMQzPq3R42/YWHHrG1fhEMO7DAuruyLASoXROAo+hZmDoWYiqOP2 + vtJk/vc6WTB0Ro5zBWznFvkvTPzpFNf+A4FlQzQrVLjVOdIKncyJLx7QFHYFWT9ewD + Abr3XpaN5brqe97hDslN/8uKZabUydYI4dKDYkuc5+WAOfPGuqM/CwIPjZQ6K8ivJK + yX3CHBThwQuJtQY85GHPte0fT0bHoQyLIBwwYa76/pdm4eTaUrMZbj2HipDGTRO6BU + DaDIdw5YiczQ+Jec5phmqwbJ5Z/uXzdV6dhFLDTaiQ+PSkRLg9F1/cPZZxYOo8Jatn + 6pSQvmzi4ALTsaPIGGyu5aazPRB0Wz7g2tyPfUcAP5rzS0aWIdoszsAXizBiJdKgr4 + X2SlOqJ3MYfen4rvLbIQwV2IiRJdtv1QoFGAyyGfDtGzYruZbpfQcouhwRJbaESTwB + 0WXMa73Q2jw59GiTB5C4U=''') + return json.loads(LicenseModel(key=key).decrypt(enc_license_key)) + + @classmethod + def get_license_info(cls, license_token, enc_license_key, safe=True, + fill_defaults=False): + license_info = {} + if fill_defaults: + license_info = cls.get_default_license() + try: + if license_token and enc_license_key: + license_info = json.loads( + LicenseModel(key=license_token).decrypt(enc_license_key)) + except Exception, e: + log.error(traceback.format_exc()) + if not safe: + raise + return license_info + + @classmethod + def get_license_key(cls): + defaults = RhodeCodeSetting.get_app_settings() + return defaults.get('rhodecode_license_key') + + def encrypt(self, text, key=None): + if not isinstance(text, basestring): + raise TypeError('Encrypt can only work on unicode or ' + 'string, got %s' % type(text)) + if not key: + key = self.key + padded_text = pad(text) + IV = Random.new().read(AES.block_size) + cipher = AES.new(key, AES.MODE_CBC, IV) + return base64.b64encode(IV + cipher.encrypt(padded_text)) + + def decrypt(self, enc_text, key=None): + if not isinstance(enc_text, (unicode, str)): + raise TypeError('Encrypt can only work on unicode or ' + 'string, got %s' % type(enc_text)) + if not key: + key = self.key + enc = base64.b64decode(enc_text) + iv = enc[:16] # iv is stored + cipher = AES.new(key, AES.MODE_CBC, iv) + return unpad(cipher.decrypt(enc[16:])) + + def generate_signature(self, license_key, sig_key): + copy = license_key.copy() + del copy['signature'] + return self.encrypt(json.dumps(copy), key=sig_key) + + def verify(self, enc_text, sig_key): + if not isinstance(enc_text, basestring): + raise TypeError('Encrypt can only work on unicode or ' + 'string, got %s' % type(enc_text)) + + decrypted = json.loads(self.decrypt(enc_text)) + try: + signature = json.loads(self.decrypt(decrypted['signature'], sig_key)) + except Exception: + signature = '-- decryption error --' + + del decrypted['signature'] + #TODO: write better diff display + if decrypted != signature: + raise TypeError('Signature mismatch got %s[%s] vs %s[%s]' + % (decrypted, type(decrypted), signature, type(signature))) + return signature