Skip to content

Commit db8c389

Browse files
authored
Merge pull request #120 from rakitzis/crawl_parallel
Parallelize get_tree_size
2 parents 2049cde + bc3ed6a commit db8c389

1 file changed

Lines changed: 96 additions & 115 deletions

File tree

diskover/diskover.py

Lines changed: 96 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,11 @@ def install_win_sig_handler():
7575
bulktime = {}
7676
warnings = 0
7777
scan_paths = []
78+
docs_buffer = {}
7879

7980
crawl_thread_lock = Lock()
8081
crawl_tree_queue = Queue()
82+
crawl_thread_budget = 0
8183

8284
quit = False
8385
emptyindex = False
@@ -176,13 +178,33 @@ def receive_signal(signum, frame):
176178
close_app()
177179
sys.exit(signum)
178180

179-
180-
def start_bulk_upload(thread, root, docs, doccount):
181+
def append_doc(thread, root, doc):
182+
"""Append doc to docs list and upload to ES in batches. Sharded by thread."""
183+
global docs_buffer
184+
with crawl_thread_lock:
185+
if thread not in docs_buffer:
186+
docs_buffer[thread] = []
187+
docs_buffer[thread].append(doc)
188+
docs = docs_buffer[thread]
189+
if len(docs) < config['ES_CHUNKSIZE']:
190+
return
191+
docs_buffer[thread] = []
192+
start_bulk_upload(thread, root, docs)
193+
194+
def flush_docs_buffer(thread, root):
195+
"""Flush docs buffer and upload to ES. Sharded by thread."""
196+
global docs_buffer
197+
with crawl_thread_lock:
198+
docs = docs_buffer[thread]
199+
docs_buffer[thread] = []
200+
start_bulk_upload(thread, root, docs)
201+
202+
def start_bulk_upload(thread, root, docs):
181203
"""Bulk uploads docs to es index."""
182204
global bulktime
183205

184206
if DEBUG:
185-
logger.debug('[{0}] bulk uploading {1} docs to ES...'.format(thread, doccount))
207+
logger.debug('[{0}] bulk uploading {1} docs to ES...'.format(thread, len(docs)))
186208
es_upload_start = time.time()
187209
try:
188210
bulk_upload(es, options.index, docs)
@@ -194,12 +216,9 @@ def start_bulk_upload(thread, root, docs, doccount):
194216
es_upload_time = time.time() - es_upload_start
195217
if DEBUG:
196218
logger.debug('[{0}] bulk uploading {1} docs completed in {2:.3f}s'.format(
197-
thread, doccount, es_upload_time))
219+
thread, len(docs), es_upload_time))
198220
with crawl_thread_lock:
199221
bulktime[root] += es_upload_time
200-
201-
return doccount
202-
203222

204223
def log_stats_thread(root):
205224
"""Shows crawl and es upload stats."""
@@ -235,7 +254,7 @@ def log_stats_thread(root):
235254
root, total_doc_count[root], elapsed, dps, dps_max, dps_min, dps_avg))
236255

237256

238-
def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdepth=999):
257+
def get_tree_size(executor, thread, root, top, path, sizes, inodes, depth=0, maxdepth=999):
239258
"""Return total size of all files in directory tree at path."""
240259
global filecount
241260
global skipfilecount
@@ -244,6 +263,7 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
244263
global skipdircount
245264
global total_doc_count
246265
global warnings
266+
global crawl_thread_budget
247267

248268
size = 0
249269
size_du = 0
@@ -259,7 +279,7 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
259279
size_du_norecurs = 0
260280
files_norecurs = 0
261281
dirs_norecurs = 0
262-
282+
263283
# use alt scanner
264284
# try to get stat info for dir path
265285
if options.altscanner:
@@ -296,6 +316,7 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
296316
return 0, 0, 0, 0
297317

298318
# scan directory
319+
crawl_futures = []
299320
try:
300321
if DEBUG:
301322
logger.debug('[{0}] Scanning path {1}...'.format(thread, path))
@@ -326,11 +347,22 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
326347
if depth < maxdepth:
327348
# recurse into subdir
328349
if not quit:
329-
s, sdu, fc, dc = get_tree_size(thread, root, top, dir_path, docs, sizes, inodes, depth+1, maxdepth)
330-
size += s
331-
size_du += sdu
332-
files += fc
333-
dirs += dc
350+
with crawl_thread_lock:
351+
can_submit = crawl_thread_budget > 0
352+
if can_submit:
353+
crawl_thread_budget -= 1
354+
355+
if can_submit:
356+
# if a spare thread is available, do the subdir crawl concurrently
357+
future = executor.submit(get_tree_size, executor, thread, root, top, dir_path, sizes, inodes, depth+1, maxdepth)
358+
crawl_futures.append(future)
359+
else:
360+
# fallback: do the crawl in the same thread recursively
361+
s, sdu, fc, dc = get_tree_size(executor, thread, root, top, dir_path, sizes, inodes, depth+1, maxdepth)
362+
size += s
363+
size_du += sdu
364+
files += fc
365+
dirs += dc
334366
else:
335367
if DEBUG:
336368
logger.debug('[{0}] not descending {1}, maxdepth {2} reached'.format(
@@ -344,6 +376,21 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
344376
if options.verbose or options.vverbose:
345377
logger.info('[{0}] skipping dir {1}'.format(thread, entry.path))
346378
d_skip_count += 1
379+
# poll for completed subdir crawls and collect results
380+
if crawl_futures:
381+
completed = []
382+
for future in crawl_futures:
383+
if future.done():
384+
s, sdu, fc, dc = future.result()
385+
size += s
386+
size_du += sdu
387+
files += fc
388+
dirs += dc
389+
completed.append(future)
390+
for future in completed:
391+
crawl_futures.remove(future)
392+
with crawl_thread_lock:
393+
crawl_thread_budget += len(completed)
347394
else:
348395
f_count += 1
349396
if not file_excluded(entry.name):
@@ -517,13 +564,8 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
517564
warnings += 1
518565
pass
519566
# add file doc to docs list and upload to ES once it reaches certain size
520-
docs.append(data.copy())
521-
doc_count = len(docs)
522-
if doc_count >= config['ES_CHUNKSIZE']:
523-
doc_count = start_bulk_upload(thread, root, docs, doc_count)
524-
tot_doc_count += doc_count
525-
docs.clear()
526-
567+
append_doc(thread, root, data.copy())
568+
tot_doc_count += 1
527569
else:
528570
f_skip_count += 1
529571
if DEBUG:
@@ -549,6 +591,17 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
549591
if options.verbose or options.vverbose:
550592
logger.info('[{0}] file name excluded, skipping file {1}'.format(thread, entry.path))
551593

594+
# wait for any remaining subdir crawls to complete
595+
if crawl_futures:
596+
for future in crawl_futures:
597+
s, sdu, fc, dc = future.result()
598+
size += s
599+
size_du += sdu
600+
files += fc
601+
dirs += dc
602+
with crawl_thread_lock:
603+
crawl_thread_budget += len(crawl_futures)
604+
crawl_futures = []
552605
# if not excluding empty dirs is set or exclude empty dirs is set but there are files or
553606
# dirs in the current directory, index the dir
554607
if not config['EXCLUDES_EMPTYDIRS'] or (config['EXCLUDES_EMPTYDIRS'] and (files > 0 or dirs > 0)):
@@ -679,13 +732,8 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
679732

680733
if depth > 0:
681734
# add file doc to docs list and upload to ES once it reaches certain size
682-
docs.append(data.copy())
683-
doc_count = len(docs)
684-
if doc_count >= config['ES_CHUNKSIZE']:
685-
doc_count = start_bulk_upload(thread, root, docs, doc_count)
686-
tot_doc_count += doc_count
687-
docs.clear()
688-
735+
append_doc(thread, root, data.copy())
736+
tot_doc_count += 1
689737
else:
690738
with crawl_thread_lock:
691739
sizes[root] = data.copy()
@@ -729,6 +777,12 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
729777
with crawl_thread_lock:
730778
warnings += 1
731779
pass
780+
# post-exception cleanup:
781+
if crawl_futures:
782+
for future in crawl_futures:
783+
future.result() # wait for subdir crawl to finish; discard results
784+
with crawl_thread_lock:
785+
crawl_thread_budget += len(crawl_futures)
732786

733787
return size, size_du, files, dirs
734788

@@ -746,111 +800,38 @@ def crawl(root):
746800
sizes = {}
747801
inodes = set()
748802

749-
def crawl_thread(root, top, depth, maxdepth, sizes, inodes):
803+
def crawl_tree(root, top, maxdepth, sizes, inodes):
750804
global total_doc_count
751805
global scan_paths
806+
global crawl_thread_budget
752807
thread = current_thread().name
753808

754809
crawl_start = time.time()
755-
docs = []
756810
with crawl_thread_lock:
757811
scan_paths.append(top)
758812
if DEBUG:
759-
logger.debug('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, depth, maxdepth))
813+
logger.debug('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, 0, maxdepth))
760814
if options.verbose or options.vverbose:
761-
logger.info('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, depth, maxdepth))
762-
size, size_du, file_count, dir_count = get_tree_size(thread, root, top, top, docs, sizes, inodes, depth, maxdepth)
763-
doc_count = len(docs)
764-
if doc_count > 0:
765-
doc_count = start_bulk_upload(thread, root, docs, doc_count)
766-
with crawl_thread_lock:
767-
total_doc_count[root] += doc_count
768-
docs.clear()
769-
# Add sizes of subdir to root dir
770-
if depth > 0:
771-
with crawl_thread_lock:
772-
sizes[top] = {
773-
'size': size,
774-
'size_du': size_du,
775-
'file_count': file_count,
776-
'dir_count': dir_count
777-
}
778-
if size > 0:
779-
with crawl_thread_lock:
780-
sizes[root]['size'] += sizes[top]['size']
781-
sizes[root]['size_du'] += sizes[top]['size_du']
782-
sizes[root]['dir_count'] += sizes[top]['dir_count']
783-
sizes[root]['file_count'] += sizes[top]['file_count']
815+
logger.info('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, 0, maxdepth))
816+
with futures.ThreadPoolExecutor(max_workers=maxthreads) as executor:
817+
crawl_thread_budget = maxthreads
818+
size, size_du, file_count, dir_count = get_tree_size(executor, thread, root, top, top, sizes, inodes, 0, maxdepth)
819+
flush_docs_buffer(thread, root)
784820

785821
crawl_time = get_time(time.time() - crawl_start)
786822
logger.info('[{0}] finished crawling {1} ({2} dirs, {3} files, {4}) in {5}'.format(
787823
thread, top, dir_count, file_count, convert_size(size), crawl_time))
788824
with crawl_thread_lock:
789825
scan_paths.remove(top)
790826

791-
792827
scandir_walk_start = time.time()
793-
794-
# find all subdirs at level 1
795-
subdir_list = []
796828
try:
797-
if DEBUG:
798-
logger.debug('Scanning path {0}...'.format(root))
799-
if options.verbose or options.vverbose:
800-
logger.info('Scanning path {0}...'.format(root))
801-
for entry in os.scandir(root):
802-
if DEBUG:
803-
logger.debug('Scanning dir entry {0}...'.format(entry.path))
804-
if options.vverbose:
805-
logger.info('Scanning dir entry {0}...'.format(entry.path))
806-
if entry.is_symlink():
807-
pass
808-
elif entry.is_dir():
809-
if IS_WIN and options.altscanner is None:
810-
dir_path = rem_win_path(entry.path)
811-
else:
812-
dir_path = entry.path
813-
if not dir_excluded(dir_path):
814-
subdir_list.append(dir_path)
815-
else:
816-
if DEBUG:
817-
logger.debug('dir excluded, skipping dir {0}'.format(entry.path))
818-
if options.verbose or options.vverbose:
819-
logger.info('dir excluded, skipping dir {0}'.format(entry.path))
820-
skipdircount[root] += 1
821-
except OSError as e:
822-
logmsg = 'OS ERROR: {0}'.format(e)
823-
logger.warning(logmsg)
824-
if config['LOGTOFILE']: logger_warn.warning(logmsg)
825-
warnings += 1
826-
pass
827-
if len(subdir_list) > 0:
828-
logger.info('found {0} subdirs at level 1, starting threads...'.format(len(subdir_list)))
829-
else:
830-
logger.info('found 0 subdirs at level 1')
831-
832-
with futures.ThreadPoolExecutor(max_workers=maxthreads) as executor:
833-
# Set up thread to crawl rootdir (not recursive)
834-
future = executor.submit(crawl_thread, root, root, 0, 0, sizes, inodes)
835-
try:
836-
data = future.result()
837-
except Exception as e:
838-
logmsg = 'FATAL ERROR: an exception has occurred: {0}'.format(e)
839-
logger.critical(logmsg, exc_info=1)
840-
if config['LOGTOFILE']: logger_warn.critical(logmsg, exc_info=1)
841-
close_app_critical_error()
842-
843-
# Set up threads to crawl (recursive) from each of the level 1 subdirs
844-
futures_subdir = {executor.submit(crawl_thread, root, subdir, 1, options.maxdepth, sizes, inodes): subdir for subdir in subdir_list}
845-
for future in futures.as_completed(futures_subdir):
846-
try:
847-
data = future.result()
848-
except Exception as e:
849-
logmsg = 'FATAL ERROR: an exception has occurred: {0}'.format(e)
850-
logger.critical(logmsg, exc_info=1)
851-
if config['LOGTOFILE']: logger_warn.critical(logmsg, exc_info=1)
852-
close_app_critical_error()
853-
829+
crawl_tree(root, root, options.maxdepth, sizes, inodes)
830+
except Exception as e:
831+
logmsg = 'FATAL ERROR: an exception has occurred: {0}'.format(e)
832+
logger.critical(logmsg, exc_info=1)
833+
if config['LOGTOFILE']: logger_warn.critical(logmsg, exc_info=1)
834+
close_app_critical_error()
854835
scandir_walk_time = time.time() - scandir_walk_start
855836
end_time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
856837

0 commit comments

Comments
 (0)