Skip to content

Commit bc3ed6a

Browse files
committed
Parallelize get_tree_size
This commit replaces the assignment of a subdir crawl thread to a top-level subdirectory, and instead attempts to assign EVERY subdir crawl to a new thread. However, at the same time a thread budget (maxthreads) is imposed, so when the number of outstanding crawlers is equal to maxthreads, new directories are crawled recursively in the same crawl thread. Each iteration of the crawler loop now contains a poll of the outstanding subdir crawlers to harvest the ones which have completed. This allows new subdirectories to be crawled in parallel as the thread budget allows. This commit also changes the accumulation of ES documents from a stack-resident parameter ("docs") to a global thread-sharded map of lists. These are flushed as needed to ES when the chunk size is reached. This change simplifies the bookkeeping and also keeps this buffer off the stack, allowing subdir crawls to be launched and harvested without perturbing this state.
1 parent 2049cde commit bc3ed6a

1 file changed

Lines changed: 96 additions & 115 deletions

File tree

diskover/diskover.py

Lines changed: 96 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,11 @@ def install_win_sig_handler():
7575
bulktime = {}
7676
warnings = 0
7777
scan_paths = []
78+
docs_buffer = {}
7879

7980
crawl_thread_lock = Lock()
8081
crawl_tree_queue = Queue()
82+
crawl_thread_budget = 0
8183

8284
quit = False
8385
emptyindex = False
@@ -176,13 +178,33 @@ def receive_signal(signum, frame):
176178
close_app()
177179
sys.exit(signum)
178180

179-
180-
def start_bulk_upload(thread, root, docs, doccount):
181+
def append_doc(thread, root, doc):
182+
"""Append doc to docs list and upload to ES in batches. Sharded by thread."""
183+
global docs_buffer
184+
with crawl_thread_lock:
185+
if thread not in docs_buffer:
186+
docs_buffer[thread] = []
187+
docs_buffer[thread].append(doc)
188+
docs = docs_buffer[thread]
189+
if len(docs) < config['ES_CHUNKSIZE']:
190+
return
191+
docs_buffer[thread] = []
192+
start_bulk_upload(thread, root, docs)
193+
194+
def flush_docs_buffer(thread, root):
195+
"""Flush docs buffer and upload to ES. Sharded by thread."""
196+
global docs_buffer
197+
with crawl_thread_lock:
198+
docs = docs_buffer[thread]
199+
docs_buffer[thread] = []
200+
start_bulk_upload(thread, root, docs)
201+
202+
def start_bulk_upload(thread, root, docs):
181203
"""Bulk uploads docs to es index."""
182204
global bulktime
183205

184206
if DEBUG:
185-
logger.debug('[{0}] bulk uploading {1} docs to ES...'.format(thread, doccount))
207+
logger.debug('[{0}] bulk uploading {1} docs to ES...'.format(thread, len(docs)))
186208
es_upload_start = time.time()
187209
try:
188210
bulk_upload(es, options.index, docs)
@@ -194,12 +216,9 @@ def start_bulk_upload(thread, root, docs, doccount):
194216
es_upload_time = time.time() - es_upload_start
195217
if DEBUG:
196218
logger.debug('[{0}] bulk uploading {1} docs completed in {2:.3f}s'.format(
197-
thread, doccount, es_upload_time))
219+
thread, len(docs), es_upload_time))
198220
with crawl_thread_lock:
199221
bulktime[root] += es_upload_time
200-
201-
return doccount
202-
203222

204223
def log_stats_thread(root):
205224
"""Shows crawl and es upload stats."""
@@ -235,7 +254,7 @@ def log_stats_thread(root):
235254
root, total_doc_count[root], elapsed, dps, dps_max, dps_min, dps_avg))
236255

237256

238-
def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdepth=999):
257+
def get_tree_size(executor, thread, root, top, path, sizes, inodes, depth=0, maxdepth=999):
239258
"""Return total size of all files in directory tree at path."""
240259
global filecount
241260
global skipfilecount
@@ -244,6 +263,7 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
244263
global skipdircount
245264
global total_doc_count
246265
global warnings
266+
global crawl_thread_budget
247267

248268
size = 0
249269
size_du = 0
@@ -259,7 +279,7 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
259279
size_du_norecurs = 0
260280
files_norecurs = 0
261281
dirs_norecurs = 0
262-
282+
263283
# use alt scanner
264284
# try to get stat info for dir path
265285
if options.altscanner:
@@ -296,6 +316,7 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
296316
return 0, 0, 0, 0
297317

298318
# scan directory
319+
crawl_futures = []
299320
try:
300321
if DEBUG:
301322
logger.debug('[{0}] Scanning path {1}...'.format(thread, path))
@@ -326,11 +347,22 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
326347
if depth < maxdepth:
327348
# recurse into subdir
328349
if not quit:
329-
s, sdu, fc, dc = get_tree_size(thread, root, top, dir_path, docs, sizes, inodes, depth+1, maxdepth)
330-
size += s
331-
size_du += sdu
332-
files += fc
333-
dirs += dc
350+
with crawl_thread_lock:
351+
can_submit = crawl_thread_budget > 0
352+
if can_submit:
353+
crawl_thread_budget -= 1
354+
355+
if can_submit:
356+
# if a spare thread is available, do the subdir crawl concurrently
357+
future = executor.submit(get_tree_size, executor, thread, root, top, dir_path, sizes, inodes, depth+1, maxdepth)
358+
crawl_futures.append(future)
359+
else:
360+
# fallback: do the crawl in the same thread recursively
361+
s, sdu, fc, dc = get_tree_size(executor, thread, root, top, dir_path, sizes, inodes, depth+1, maxdepth)
362+
size += s
363+
size_du += sdu
364+
files += fc
365+
dirs += dc
334366
else:
335367
if DEBUG:
336368
logger.debug('[{0}] not descending {1}, maxdepth {2} reached'.format(
@@ -344,6 +376,21 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
344376
if options.verbose or options.vverbose:
345377
logger.info('[{0}] skipping dir {1}'.format(thread, entry.path))
346378
d_skip_count += 1
379+
# poll for completed subdir crawls and collect results
380+
if crawl_futures:
381+
completed = []
382+
for future in crawl_futures:
383+
if future.done():
384+
s, sdu, fc, dc = future.result()
385+
size += s
386+
size_du += sdu
387+
files += fc
388+
dirs += dc
389+
completed.append(future)
390+
for future in completed:
391+
crawl_futures.remove(future)
392+
with crawl_thread_lock:
393+
crawl_thread_budget += len(completed)
347394
else:
348395
f_count += 1
349396
if not file_excluded(entry.name):
@@ -517,13 +564,8 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
517564
warnings += 1
518565
pass
519566
# add file doc to docs list and upload to ES once it reaches certain size
520-
docs.append(data.copy())
521-
doc_count = len(docs)
522-
if doc_count >= config['ES_CHUNKSIZE']:
523-
doc_count = start_bulk_upload(thread, root, docs, doc_count)
524-
tot_doc_count += doc_count
525-
docs.clear()
526-
567+
append_doc(thread, root, data.copy())
568+
tot_doc_count += 1
527569
else:
528570
f_skip_count += 1
529571
if DEBUG:
@@ -549,6 +591,17 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
549591
if options.verbose or options.vverbose:
550592
logger.info('[{0}] file name excluded, skipping file {1}'.format(thread, entry.path))
551593

594+
# wait for any remaining subdir crawls to complete
595+
if crawl_futures:
596+
for future in crawl_futures:
597+
s, sdu, fc, dc = future.result()
598+
size += s
599+
size_du += sdu
600+
files += fc
601+
dirs += dc
602+
with crawl_thread_lock:
603+
crawl_thread_budget += len(crawl_futures)
604+
crawl_futures = []
552605
# if not excluding empty dirs is set or exclude empty dirs is set but there are files or
553606
# dirs in the current directory, index the dir
554607
if not config['EXCLUDES_EMPTYDIRS'] or (config['EXCLUDES_EMPTYDIRS'] and (files > 0 or dirs > 0)):
@@ -679,13 +732,8 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
679732

680733
if depth > 0:
681734
# add file doc to docs list and upload to ES once it reaches certain size
682-
docs.append(data.copy())
683-
doc_count = len(docs)
684-
if doc_count >= config['ES_CHUNKSIZE']:
685-
doc_count = start_bulk_upload(thread, root, docs, doc_count)
686-
tot_doc_count += doc_count
687-
docs.clear()
688-
735+
append_doc(thread, root, data.copy())
736+
tot_doc_count += 1
689737
else:
690738
with crawl_thread_lock:
691739
sizes[root] = data.copy()
@@ -729,6 +777,12 @@ def get_tree_size(thread, root, top, path, docs, sizes, inodes, depth=0, maxdept
729777
with crawl_thread_lock:
730778
warnings += 1
731779
pass
780+
# post-exception cleanup:
781+
if crawl_futures:
782+
for future in crawl_futures:
783+
future.result() # wait for subdir crawl to finish; discard results
784+
with crawl_thread_lock:
785+
crawl_thread_budget += len(crawl_futures)
732786

733787
return size, size_du, files, dirs
734788

@@ -746,111 +800,38 @@ def crawl(root):
746800
sizes = {}
747801
inodes = set()
748802

749-
def crawl_thread(root, top, depth, maxdepth, sizes, inodes):
803+
def crawl_tree(root, top, maxdepth, sizes, inodes):
750804
global total_doc_count
751805
global scan_paths
806+
global crawl_thread_budget
752807
thread = current_thread().name
753808

754809
crawl_start = time.time()
755-
docs = []
756810
with crawl_thread_lock:
757811
scan_paths.append(top)
758812
if DEBUG:
759-
logger.debug('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, depth, maxdepth))
813+
logger.debug('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, 0, maxdepth))
760814
if options.verbose or options.vverbose:
761-
logger.info('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, depth, maxdepth))
762-
size, size_du, file_count, dir_count = get_tree_size(thread, root, top, top, docs, sizes, inodes, depth, maxdepth)
763-
doc_count = len(docs)
764-
if doc_count > 0:
765-
doc_count = start_bulk_upload(thread, root, docs, doc_count)
766-
with crawl_thread_lock:
767-
total_doc_count[root] += doc_count
768-
docs.clear()
769-
# Add sizes of subdir to root dir
770-
if depth > 0:
771-
with crawl_thread_lock:
772-
sizes[top] = {
773-
'size': size,
774-
'size_du': size_du,
775-
'file_count': file_count,
776-
'dir_count': dir_count
777-
}
778-
if size > 0:
779-
with crawl_thread_lock:
780-
sizes[root]['size'] += sizes[top]['size']
781-
sizes[root]['size_du'] += sizes[top]['size_du']
782-
sizes[root]['dir_count'] += sizes[top]['dir_count']
783-
sizes[root]['file_count'] += sizes[top]['file_count']
815+
logger.info('[{0}] starting crawl {1} (depth {2}, maxdepth {3})...'.format(thread, top, 0, maxdepth))
816+
with futures.ThreadPoolExecutor(max_workers=maxthreads) as executor:
817+
crawl_thread_budget = maxthreads
818+
size, size_du, file_count, dir_count = get_tree_size(executor, thread, root, top, top, sizes, inodes, 0, maxdepth)
819+
flush_docs_buffer(thread, root)
784820

785821
crawl_time = get_time(time.time() - crawl_start)
786822
logger.info('[{0}] finished crawling {1} ({2} dirs, {3} files, {4}) in {5}'.format(
787823
thread, top, dir_count, file_count, convert_size(size), crawl_time))
788824
with crawl_thread_lock:
789825
scan_paths.remove(top)
790826

791-
792827
scandir_walk_start = time.time()
793-
794-
# find all subdirs at level 1
795-
subdir_list = []
796828
try:
797-
if DEBUG:
798-
logger.debug('Scanning path {0}...'.format(root))
799-
if options.verbose or options.vverbose:
800-
logger.info('Scanning path {0}...'.format(root))
801-
for entry in os.scandir(root):
802-
if DEBUG:
803-
logger.debug('Scanning dir entry {0}...'.format(entry.path))
804-
if options.vverbose:
805-
logger.info('Scanning dir entry {0}...'.format(entry.path))
806-
if entry.is_symlink():
807-
pass
808-
elif entry.is_dir():
809-
if IS_WIN and options.altscanner is None:
810-
dir_path = rem_win_path(entry.path)
811-
else:
812-
dir_path = entry.path
813-
if not dir_excluded(dir_path):
814-
subdir_list.append(dir_path)
815-
else:
816-
if DEBUG:
817-
logger.debug('dir excluded, skipping dir {0}'.format(entry.path))
818-
if options.verbose or options.vverbose:
819-
logger.info('dir excluded, skipping dir {0}'.format(entry.path))
820-
skipdircount[root] += 1
821-
except OSError as e:
822-
logmsg = 'OS ERROR: {0}'.format(e)
823-
logger.warning(logmsg)
824-
if config['LOGTOFILE']: logger_warn.warning(logmsg)
825-
warnings += 1
826-
pass
827-
if len(subdir_list) > 0:
828-
logger.info('found {0} subdirs at level 1, starting threads...'.format(len(subdir_list)))
829-
else:
830-
logger.info('found 0 subdirs at level 1')
831-
832-
with futures.ThreadPoolExecutor(max_workers=maxthreads) as executor:
833-
# Set up thread to crawl rootdir (not recursive)
834-
future = executor.submit(crawl_thread, root, root, 0, 0, sizes, inodes)
835-
try:
836-
data = future.result()
837-
except Exception as e:
838-
logmsg = 'FATAL ERROR: an exception has occurred: {0}'.format(e)
839-
logger.critical(logmsg, exc_info=1)
840-
if config['LOGTOFILE']: logger_warn.critical(logmsg, exc_info=1)
841-
close_app_critical_error()
842-
843-
# Set up threads to crawl (recursive) from each of the level 1 subdirs
844-
futures_subdir = {executor.submit(crawl_thread, root, subdir, 1, options.maxdepth, sizes, inodes): subdir for subdir in subdir_list}
845-
for future in futures.as_completed(futures_subdir):
846-
try:
847-
data = future.result()
848-
except Exception as e:
849-
logmsg = 'FATAL ERROR: an exception has occurred: {0}'.format(e)
850-
logger.critical(logmsg, exc_info=1)
851-
if config['LOGTOFILE']: logger_warn.critical(logmsg, exc_info=1)
852-
close_app_critical_error()
853-
829+
crawl_tree(root, root, options.maxdepth, sizes, inodes)
830+
except Exception as e:
831+
logmsg = 'FATAL ERROR: an exception has occurred: {0}'.format(e)
832+
logger.critical(logmsg, exc_info=1)
833+
if config['LOGTOFILE']: logger_warn.critical(logmsg, exc_info=1)
834+
close_app_critical_error()
854835
scandir_walk_time = time.time() - scandir_walk_start
855836
end_time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
856837

0 commit comments

Comments
 (0)