Files
@ f6ac79182600
Branch filter:
Location: kallithea/pylons_app/lib/auth.py
f6ac79182600
4.3 KiB
text/x-python
Added rest controllers for repos and users,
templating changes + css fixes
templating changes + css fixes
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | import sqlite3
import os
import logging
from os.path import dirname as dn
from datetime import datetime
import crypt
log = logging.getLogger(__name__)
ROOT = dn(dn(dn(os.path.realpath(__file__))))
def get_sqlite_conn_cur():
conn = sqlite3.connect(os.path.join(ROOT, 'auth.sqlite'))
cur = conn.cursor()
return conn, cur
def admin_auth(username, password):
conn, cur = get_sqlite_conn_cur()
password_crypt = crypt.crypt(password, '6a')
try:
cur.execute("SELECT * FROM users WHERE username=?", (username,))
data = cur.fetchone()
except sqlite3.OperationalError as e:
data = None
log.error(e)
if data:
if data[3]:
if data[1] == username and data[2] == password_crypt and data[4]:
log.info('user %s authenticated correctly', username)
return True
else:
log.error('user %s is disabled', username)
return False
def authfunc(environ, username, password):
conn, cur = get_sqlite_conn_cur()
password_crypt = crypt.crypt(password, '6a')
try:
cur.execute("SELECT * FROM users WHERE username=?", (username,))
data = cur.fetchone()
except sqlite3.OperationalError as e:
data = None
log.error(e)
if data:
if data[3]:
if data[1] == username and data[2] == password_crypt:
log.info('user %s authenticated correctly', username)
if environ:
http_accept = environ.get('HTTP_ACCEPT')
if http_accept.startswith('application/mercurial') or \
environ['PATH_INFO'].find('raw-file') != -1:
cmd = environ['PATH_INFO']
for qry in environ['QUERY_STRING'].split('&'):
if qry.startswith('cmd'):
cmd += "|" + qry
try:
cur.execute('''INSERT INTO
user_logs
VALUES(?,?,?,?)''',
(None, data[0], cmd, datetime.now()))
conn.commit()
except Exception as e:
conn.rollback()
log.error(e)
return True
else:
log.error('user %s is disabled', username)
return False
def create_user_table():
'''
Create a auth database
'''
conn, cur = get_sqlite_conn_cur()
try:
log.info('creating table %s', 'users')
cur.execute('''DROP TABLE IF EXISTS users ''')
cur.execute('''CREATE TABLE users
(id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT,
password TEXT,
active INTEGER,
admin INTEGER)''')
log.info('creating table %s', 'user_logs')
cur.execute('''DROP TABLE IF EXISTS user_logs ''')
cur.execute('''CREATE TABLE user_logs
(id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
last_action TEXT,
last_action_date DATETIME)''')
conn.commit()
except:
conn.rollback()
raise
cur.close()
def create_user(username, password, admin=False):
conn, cur = get_sqlite_conn_cur()
password_crypt = crypt.crypt(password, '6a')
log.info('creating user %s', username)
try:
cur.execute('''INSERT INTO users values (?,?,?,?,?) ''',
(None, username, password_crypt, 1, admin))
conn.commit()
except:
conn.rollback()
raise
if __name__ == "__main__":
create_user_table()
create_user('marcink', 'qweqwe', True)
create_user('lukaszd', 'qweqwe')
create_user('adriand', 'qweqwe')
create_user('radek', 'qweqwe')
create_user('skrzeka', 'qweqwe')
create_user('bart', 'qweqwe')
create_user('maho', 'qweqwe')
create_user('michalg', 'qweqwe')
#authfunc('', 'marcink', 'qweqwe')
|