Files
@ 9bedaa073c23
Branch filter:
Location: kallithea/pylons_app/lib/indexers/multiprocessing_indexer.py - annotation
9bedaa073c23
5.3 KiB
text/x-python
fixed lockdecrator to return executed function data
removed print
removed print
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b b153a51b1d3b | from multiprocessing import Process, Queue, cpu_count, Lock
import socket, sys
import time
import os
import sys
from os.path import dirname as dn
from multiprocessing.dummy import current_process
from shutil import rmtree
sys.path.append(dn(dn(dn(os.path.realpath(__file__)))))
from pylons_app.model.hg_model import HgModel
from whoosh.analysis import RegexTokenizer, LowercaseFilter, StopFilter
from whoosh.fields import TEXT, ID, STORED, Schema
from whoosh.index import create_in, open_dir
from datetime import datetime
from multiprocessing.process import current_process
from multiprocessing import Array, Value
root = dn(dn(os.path.dirname(os.path.abspath(__file__))))
idx_location = os.path.join(root, 'data', 'index')
root_path = '/home/marcink/python_workspace_dirty/*'
exclude_extensions = ['pyc', 'mo', 'png', 'jpg', 'jpeg', 'gif', 'swf',
'dll', 'ttf', 'psd', 'svg', 'pdf', 'bmp', 'dll']
my_analyzer = RegexTokenizer() | LowercaseFilter()
def scan_paths(root_location):
return HgModel.repo_scan('/', root_location, None, True)
def index_paths(root_dir):
index_paths_ = set()
for path, dirs, files in os.walk(root_dir):
if path.find('.hg') == -1:
#if path.find('.hg') == -1 and path.find('bel-epa') != -1:
for f in files:
index_paths_.add(os.path.join(path, f))
return index_paths_
def get_schema():
return Schema(owner=TEXT(),
repository=TEXT(stored=True),
path=ID(stored=True, unique=True),
content=TEXT(stored=True, analyzer=my_analyzer),
modtime=STORED())
def add_doc(writer, path, repo_name, contact):
"""
Adding doc to writer
@param writer:
@param path:
@param repo:
@param fname:
"""
#we don't won't to read excluded file extensions just index them
if path.split('/')[-1].split('.')[-1].lower() not in exclude_extensions:
fobj = open(path, 'rb')
content = fobj.read()
fobj.close()
try:
u_content = unicode(content)
except UnicodeDecodeError:
#incase we have a decode error just represent as byte string
u_content = unicode(str(content).encode('string_escape'))
else:
u_content = u''
writer.add_document(repository=u"%s" % repo_name,
owner=unicode(contact),
path=u"%s" % path,
content=u_content,
modtime=os.path.getmtime(path))
class MultiProcessIndexer(object):
""" multiprocessing whoosh indexer """
def __init__(self, idx, work_set=set(), nr_processes=cpu_count()):
q = Queue()
l = Lock()
work_set = work_set
writer = None
#writer = idx.writer()
for q_task in work_set:
q.put(q_task)
q.put('COMMIT')
#to stop all processes we have to put STOP to queue and
#break the loop for each process
for _ in xrange(nr_processes):
q.put('STOP')
for _ in xrange(nr_processes):
p = Process(target=self.work_func, args=(q, l, idx, writer))
p.start()
def work_func(self, q, l, idx, writer):
""" worker class invoked by process """
writer = idx.writer()
while True:
q_task = q.get()
proc = current_process()
# if q_task == 'COMMIT':
# l.acquire()
# sys.stdout.write('%s commiting and STOP\n' % proc._name)
# writer.commit(merge=False)
# l.release()
# break
# l.acquire()
# writer = idx.writer()
# l.release()
if q_task == 'STOP':
sys.stdout.write('%s STOP\n' % proc._name)
break
if q_task != 'COMMIT':
l.acquire()
sys.stdout.write(' >> %s %s %s @ ' % q_task)
sys.stdout.write(' %s \n' % proc._name)
l.release()
add_doc(writer, q_task[0], q_task[1], q_task[2])
l.acquire()
writer.commit(merge=True)
l.release()
if __name__ == "__main__":
#build queue
do = True if len(sys.argv) > 1 else False
q_tasks = []
if os.path.exists(idx_location):
rmtree(idx_location)
if not os.path.exists(idx_location):
os.mkdir(idx_location)
idx = create_in(idx_location, get_schema() , indexname='HG_INDEX')
if do:
sys.stdout.write('Building queue...')
for cnt, repo in enumerate(scan_paths(root_path).values()):
if repo.name != 'evoice_py':
continue
q_tasks.extend([(idx_path, repo.name, repo.contact) for idx_path in index_paths(repo.path)])
if cnt == 4:
break
sys.stdout.write('done\n')
mpi = MultiProcessIndexer(idx, q_tasks)
else:
print 'checking index'
reader = idx.reader()
all = reader.all_stored_fields()
#print all
for fields in all:
print fields['path']
|