diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index eeaa2ffae..5c88b1163 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -1,47 +1,47 @@ import asyncio +import logging import os +from queue import Queue import re import traceback import zipfile from datetime import timedelta, datetime - +from django.conf import settings from io import BytesIO -from tempfile import TemporaryDirectory, NamedTemporaryFile - -# import json -# import urllib +from tempfile import NamedTemporaryFile, TemporaryDirectory import oyaml as yaml import requests from celery._state import app_or_default +from competitions.models import ( + Competition, + CompetitionCreationTaskStatus, + CompetitionDump, + Phase, + Submission, + SubmissionDetails, +) +from competitions.unpackers.utils import CompetitionUnpackingException +from competitions.unpackers.v1 import V15Unpacker +from competitions.unpackers.v2 import V2Unpacker +from datasets.models import Data from django.conf import settings -# from django_redis import get_redis_connection from django.core.exceptions import ObjectDoesNotExist from django.core.files.base import ContentFile -from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F from django.db import transaction +from django.db.models import Case, Count, F, OuterRef, Subquery, Value, When from django.utils.text import slugify from django.utils.timezone import now -from rest_framework.exceptions import ValidationError - -from celery_config import app # , app_for_vhost -from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \ - CompetitionDump, Phase -# from queues.models import Queue -from competitions.unpackers.utils import CompetitionUnpackingException -from competitions.unpackers.v1 import V15Unpacker -from competitions.unpackers.v2 import V2Unpacker from leaderboards.models import Leaderboard +from rest_framework.exceptions import ValidationError from tasks.models import Task -from datasets.models import Data + +from celery_config import app from utils.data import make_url_sassy from utils.email import codalab_send_markdown_email -from channels.layers import get_channel_layer -from asgiref.sync import async_to_sync import logging -# from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY, extract_queue_names, is_compute_worker, known_compute_queue_names logger = logging.getLogger(__name__) COMPETITION_FIELDS = [ @@ -61,35 +61,35 @@ "reward", "contact_email", "fact_sheet", - "forum_enabled" + "forum_enabled", ] TASK_FIELDS = [ - 'name', - 'description', - 'key', - 'is_public', + "name", + "description", + "key", + "is_public", ] SOLUTION_FIELDS = [ - 'name', - 'description', - 'tasks', - 'key', + "name", + "description", + "tasks", + "key", ] PHASE_FIELDS = [ - 'index', - 'name', - 'description', - 'start', - 'end', - 'max_submissions_per_day', - 'max_submissions_per_person', - 'execution_time_limit', - 'auto_migrate_to_this_phase', - 'hide_output', - 'hide_prediction_output', - 'hide_score_output', + "index", + "name", + "description", + "start", + "end", + "max_submissions_per_day", + "max_submissions_per_person", + "execution_time_limit", + "auto_migrate_to_this_phase", + "hide_output", + "hide_prediction_output", + "hide_score_output", ] PHASE_FILES = [ "input_data", @@ -99,15 +99,12 @@ "public_data", "starting_kit", ] -PAGE_FIELDS = [ - "title" -] +PAGE_FIELDS = ["title"] LEADERBOARD_FIELDS = [ - 'title', - 'key', - 'hidden', - 'submission_rule', - + "title", + "key", + "hidden", + "submission_rule", # For later # 'force_submission_to_leaderboard', # 'force_best_submission_to_leaderboard', @@ -115,16 +112,18 @@ ] COLUMN_FIELDS = [ - 'title', - 'key', - 'index', - 'sorting', - 'computation', - 'computation_indexes', - 'hidden', - 'precision', + "title", + "key", + "index", + "sorting", + "computation", + "computation_indexes", + "hidden", + "precision", ] -MAX_EXECUTION_TIME_LIMIT = int(os.environ.get('MAX_EXECUTION_TIME_LIMIT', 600)) # time limit of the default queue +MAX_EXECUTION_TIME_LIMIT = int( + os.environ.get("MAX_EXECUTION_TIME_LIMIT", 600) +) # time limit of the default queue def _send_to_compute_worker(submission, is_scoring): @@ -133,43 +132,51 @@ def _send_to_compute_worker(submission, is_scoring): "submissions_api_url": settings.SUBMISSIONS_API_URL, "secret": submission.secret, "docker_image": submission.phase.competition.docker_image, - "execution_time_limit": min(MAX_EXECUTION_TIME_LIMIT, submission.phase.execution_time_limit), + "execution_time_limit": min( + MAX_EXECUTION_TIME_LIMIT, submission.phase.execution_time_limit + ), "id": submission.pk, "is_scoring": is_scoring, } - if not submission.detailed_result.name and submission.phase.competition.enable_detailed_results: - submission.detailed_result.save('detailed_results.html', ContentFile(''.encode())) # must encode here for GCS - submission.save(update_fields=['detailed_result']) + if ( + not submission.detailed_result.name + and submission.phase.competition.enable_detailed_results + ): + submission.detailed_result.save( + "detailed_results.html", ContentFile("".encode()) + ) # must encode here for GCS + submission.save(update_fields=["detailed_result"]) if not submission.prediction_result.name: - submission.prediction_result.save('prediction_result.zip', ContentFile(''.encode())) # must encode here for GCS - submission.save(update_fields=['prediction_result']) + submission.prediction_result.save( + "prediction_result.zip", ContentFile("".encode()) + ) # must encode here for GCS + submission.save(update_fields=["prediction_result"]) if not submission.scoring_result.name: - submission.scoring_result.save('scoring_result.zip', ContentFile(''.encode())) # must encode here for GCS - submission.save(update_fields=['scoring_result']) + submission.scoring_result.save( + "scoring_result.zip", ContentFile("".encode()) + ) # must encode here for GCS + submission.save(update_fields=["scoring_result"]) submission = Submission.objects.get(id=submission.id) task = submission.task if not is_scoring: - run_args['prediction_result'] = make_url_sassy( - path=submission.prediction_result.name, - permission='w' + run_args["prediction_result"] = make_url_sassy( + path=submission.prediction_result.name, permission="w" ) else: if submission.phase.competition.enable_detailed_results: - run_args['detailed_results_url'] = make_url_sassy( + run_args["detailed_results_url"] = make_url_sassy( path=submission.detailed_result.name, - permission='w', - content_type='text/html' + permission="w", + content_type="text/html", ) - run_args['prediction_result'] = make_url_sassy( - path=submission.prediction_result.name, - permission='r' + run_args["prediction_result"] = make_url_sassy( + path=submission.prediction_result.name, permission="r" ) - run_args['scoring_result'] = make_url_sassy( - path=submission.scoring_result.name, - permission='w' + run_args["scoring_result"] = make_url_sassy( + path=submission.scoring_result.name, permission="w" ) if task.ingestion_program: @@ -177,12 +184,12 @@ def _send_to_compute_worker(submission, is_scoring): run_args['ingestion_program_data'] = make_url_sassy(task.ingestion_program.data_file.name) if task.input_data and (not is_scoring or task.ingestion_only_during_scoring): - run_args['input_data'] = make_url_sassy(task.input_data.data_file.name) + run_args["input_data"] = make_url_sassy(task.input_data.data_file.name) if is_scoring and task.reference_data: - run_args['reference_data'] = make_url_sassy(task.reference_data.data_file.name) + run_args["reference_data"] = make_url_sassy(task.reference_data.data_file.name) - run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring + run_args["ingestion_only_during_scoring"] = task.ingestion_only_during_scoring if is_scoring: run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) @@ -208,9 +215,13 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding - if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue + if ( + submission.phase.competition.queue + ): # if the competition is running on a custom queue, not the default queue submission.queue = submission.phase.competition.queue - run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit + run_args["execution_time_limit"] = ( + submission.phase.execution_time_limit + ) # use the competition time limit submission.save(update_fields=["queue"]) if submission.status == Submission.SUBMITTING: # Don't want to mark an already-prepared submission as "submitted" again, so @@ -225,20 +236,22 @@ def _enqueue_after_commit(): if submission.phase.competition.queue: celery_app = app_or_default() with celery_app.connection() as new_connection: - new_connection.virtual_host = str(submission.phase.competition.queue.vhost) + new_connection.virtual_host = str( + submission.phase.competition.queue.vhost + ) task = celery_app.send_task( - 'compute_worker_run', + "compute_worker_run", args=(run_args,), - queue='compute-worker', + queue="compute-worker", soft_time_limit=time_limit, connection=new_connection, priority=priority, ) else: task = app.send_task( - 'compute_worker_run', + "compute_worker_run", args=(run_args,), - queue='compute-worker', + queue="compute-worker", soft_time_limit=time_limit, priority=priority, ) @@ -250,8 +263,12 @@ def _enqueue_after_commit(): def create_detailed_output_file(detail_name, submission): # Detail logs like stdout/etc. - new_details = SubmissionDetails.objects.create(submission=submission, name=detail_name) - new_details.data_file.save(f'{detail_name}.txt', ContentFile(''.encode())) # must encode here for GCS + new_details = SubmissionDetails.objects.create( + submission=submission, name=detail_name + ) + new_details.data_file.save( + f"{detail_name}.txt", ContentFile("".encode()) + ) # must encode here for GCS return make_url_sassy(new_details.data_file.name, permission="w") @@ -262,47 +279,49 @@ def run_submission(submission_pk, tasks=None, is_scoring=False): def send_submission_message(submission, data): from channels.layers import get_channel_layer + channel_layer = get_channel_layer() user = submission.owner - asyncio.get_event_loop().run_until_complete(channel_layer.group_send(f"submission_listening_{user.pk}", { - 'type': 'submission.message', - 'text': data, - 'submission_id': submission.pk, - })) + asyncio.get_event_loop().run_until_complete( + channel_layer.group_send( + f"submission_listening_{user.pk}", + { + "type": "submission.message", + "text": data, + "submission_id": submission.pk, + }, + ) + ) def send_parent_status(submission): """Helper function we can mock in tests, instead of having to do async mocks""" - send_submission_message(submission, { - "kind": "status_update", - "status": "Running" - }) + send_submission_message(submission, {"kind": "status_update", "status": "Running"}) def send_child_id(submission, child_id): """Helper function we can mock in tests, instead of having to do async mocks""" - send_submission_message(submission, { - "kind": "child_update", - "child_id": child_id - }) + send_submission_message(submission, {"kind": "child_update", "child_id": child_id}) -@app.task(queue='site-worker', soft_time_limit=60) +@app.task(queue="site-worker", soft_time_limit=60) def _run_submission(submission_pk, task_pks=None, is_scoring=False): """This function is wrapped so that when we run tests we can run this function not via celery""" select_models = ( - 'phase', - 'phase__competition', + "phase", + "phase__competition", ) prefetch_models = ( - 'details', - 'phase__tasks__input_data', - 'phase__tasks__reference_data', - 'phase__tasks__scoring_program', - 'phase__tasks__ingestion_program', + "details", + "phase__tasks__input_data", + "phase__tasks__reference_data", + "phase__tasks__scoring_program", + "phase__tasks__ingestion_program", + ) + qs = Submission.objects.select_related(*select_models).prefetch_related( + *prefetch_models ) - qs = Submission.objects.select_related(*select_models).prefetch_related(*prefetch_models) submission = qs.get(pk=submission_pk) if submission.is_specific_task_re_run: @@ -313,7 +332,7 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): else: tasks = submission.phase.tasks.filter(pk__in=task_pks) - tasks = tasks.order_by('pk') + tasks = tasks.order_by("pk") if len(tasks) > 1: # The initial submission object becomes the parent submission and we create children for each task @@ -331,7 +350,7 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): participant=submission.participant, parent=submission, task=task, - fact_sheet_answers=submission.fact_sheet_answers + fact_sheet_answers=submission.fact_sheet_answers, ) child_sub.save(ignore_submission_limit=True) _send_to_compute_worker(child_sub, is_scoring=False) @@ -344,7 +363,7 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): _send_to_compute_worker(submission, is_scoring) -@app.task(queue='site-worker', soft_time_limit=60 * 60) # 1 hour timeout +@app.task(queue="site-worker", soft_time_limit=60 * 60) # 1 hour timeout def unpack_competition(status_pk): logger.info(f"Starting unpack with status pk = {status_pk}") status = CompetitionCreationTaskStatus.objects.get(pk=status_pk) @@ -365,8 +384,12 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail # Extract bundle try: with NamedTemporaryFile(mode="w+b") as temp_file: - logger.info(f"Download competition bundle: {competition_dataset.data_file.name}") - competition_bundle_url = make_url_sassy(competition_dataset.data_file.url) + logger.info( + f"Download competition bundle: {competition_dataset.data_file.name}" + ) + competition_bundle_url = make_url_sassy( + competition_dataset.data_file.url + ) try: with requests.get(competition_bundle_url, stream=True) as r: r.raise_for_status() @@ -374,12 +397,14 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail temp_file.write(chunk) r.close() except requests.exceptions.RequestException as e: - raise CompetitionUnpackingException(f"Failed to download bundle from storage: {e}") + raise CompetitionUnpackingException( + f"Failed to download bundle from storage: {e}" + ) # seek back to the start of the tempfile after writing to it.. temp_file.seek(0) - with zipfile.ZipFile(temp_file.name, 'r') as zip_pointer: + with zipfile.ZipFile(temp_file.name, "r") as zip_pointer: zip_pointer.extractall(temp_directory) except zipfile.BadZipFile: raise CompetitionUnpackingException("Bad zip file uploaded.") @@ -396,20 +421,24 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail with open(yaml_path) as f: competition_yaml = yaml.safe_load(f.read()) except yaml.YAMLError as e: - raise CompetitionUnpackingException(f"Error parsing competition.yaml: {e}") + raise CompetitionUnpackingException( + f"Error parsing competition.yaml: {e}" + ) except Exception as e: - raise CompetitionUnpackingException(f"Failed to read competition.yaml: {e}") + raise CompetitionUnpackingException( + f"Failed to read competition.yaml: {e}" + ) - yaml_version = str(competition_yaml.get('version', '1')) + yaml_version = str(competition_yaml.get("version", "1")) logger.info(f"The YAML version is: {yaml_version}") - if yaml_version in ['1', '1.5']: + if yaml_version in ["1", "1.5"]: unpacker_class = V15Unpacker - elif yaml_version == '2': + elif yaml_version == "2": unpacker_class = V2Unpacker else: raise CompetitionUnpackingException( - 'A suitable version could not be found for this competition. Make sure one is supplied in the yaml.' + "A suitable version could not be found for this competition. Make sure one is supplied in the yaml." ) unpacker = unpacker_class( @@ -422,6 +451,7 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail try: competition = unpacker.save() except ValidationError as e: + def _get_error_string(error_dict): """Helps us nicely print out a ValidationError""" for key, errors in error_dict.items(): @@ -463,7 +493,7 @@ def _get_error_string(error_dict): mark_status_as_failed_and_delete_dataset(status, message) -@app.task(queue='site-worker', soft_time_limit=60 * 10) +@app.task(queue="site-worker", soft_time_limit=60 * 10) def create_competition_dump(competition_pk, keys_instead_of_files=False): yaml_data = {"version": "2"} try: @@ -472,7 +502,7 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): logger.info(f"Finding competition {competition_pk}") comp = Competition.objects.get(pk=competition_pk) zip_buffer = BytesIO() - current_date_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S') + current_date_time = datetime.today().strftime("%Y-%m-%d %H:%M:%S") zip_name = f"{comp.title}-{current_date_time}.zip" zip_file = zipfile.ZipFile(zip_buffer, "w") @@ -480,14 +510,14 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): for field in COMPETITION_FIELDS: if hasattr(comp, field): value = getattr(comp, field, "") - if field == 'queue' and value is not None: + if field == "queue" and value is not None: value = str(value.vhost) yaml_data[field] = value if comp.logo: logger.info("Checking logo") try: - yaml_data['image'] = re.sub(r'.*/', '', comp.logo.name) - zip_file.writestr(yaml_data['image'], comp.logo.read()) + yaml_data["image"] = re.sub(r".*/", "", comp.logo.name) + zip_file.writestr(yaml_data["image"], comp.logo.read()) logger.info(f"Logo found for competition {comp.pk}") except OSError: logger.warning( @@ -496,25 +526,25 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): # -------- Competition Terms ------- if comp.terms: - yaml_data['terms'] = 'terms.md' - zip_file.writestr('terms.md', comp.terms) + yaml_data["terms"] = "terms.md" + zip_file.writestr("terms.md", comp.terms) # -------- Competition Pages ------- - yaml_data['pages'] = [] + yaml_data["pages"] = [] for page in comp.pages.all(): temp_page_data = {} for field in PAGE_FIELDS: if hasattr(page, field): temp_page_data[field] = getattr(page, field, "") page_file_name = f"{slugify(page.title)}-{page.pk}.md" - temp_page_data['file'] = page_file_name - yaml_data['pages'].append(temp_page_data) - zip_file.writestr(temp_page_data['file'], page.content) + temp_page_data["file"] = page_file_name + yaml_data["pages"].append(temp_page_data) + zip_file.writestr(temp_page_data["file"], page.content) # -------- Competition Tasks/Solutions ------- - yaml_data['tasks'] = [] - yaml_data['solutions'] = [] + yaml_data["tasks"] = [] + yaml_data["solutions"] = [] task_solution_pairs = {} tasks = [task for phase in comp.phases.all() for task in phase.tasks.all()] @@ -525,23 +555,18 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): for index, task in enumerate(tasks): task_solution_pairs[task.id] = { - 'index': index, - 'solutions': { - 'ids': [], - 'indexes': [] - } + "index": index, + "solutions": {"ids": [], "indexes": []}, } - temp_task_data = { - 'index': index - } + temp_task_data = {"index": index} for field in TASK_FIELDS: data = getattr(task, field, "") # If keys_instead of files is not true and field is key, then skip this filed - if not keys_instead_of_files and field == 'key': + if not keys_instead_of_files and field == "key": continue - if field == 'key': + if field == "key": data = str(data) temp_task_data[field] = data @@ -554,116 +579,136 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): temp_task_data[file_type] = str(temp_dataset.key) else: try: - temp_task_data[file_type] = f"{file_type}-{task.pk}.zip" - zip_file.writestr(temp_task_data[file_type], temp_dataset.data_file.read()) + temp_task_data[file_type] = ( + f"{file_type}-{task.pk}.zip" + ) + zip_file.writestr( + temp_task_data[file_type], + temp_dataset.data_file.read(), + ) except OSError: logger.error( f"The file field is set, but no actual" f" file was found for dataset: {temp_dataset.pk} with name {temp_dataset.name}" ) else: - logger.warning(f"Could not find data file for dataset object: {temp_dataset.pk}") + logger.warning( + f"Could not find data file for dataset object: {temp_dataset.pk}" + ) # Now for all of our solutions for the tasks, write those too for solution in task.solutions.all(): # for index_two, solution in enumerate(task.solutions.all()): # temp_index = index_two # IF OUR SOLUTION WAS ALREADY ADDED - if solution.id in task_solution_pairs[task.id]['solutions']['ids']: - for solution_data in yaml_data['solutions']: - if solution_data['key'] == solution.key: - solution_data['tasks'].append(task.id) + if solution.id in task_solution_pairs[task.id]["solutions"]["ids"]: + for solution_data in yaml_data["solutions"]: + if solution_data["key"] == solution.key: + solution_data["tasks"].append(task.id) break break # Else if our index is already taken - elif index_two in task_solution_pairs[task.id]['solutions']['indexes']: + elif index_two in task_solution_pairs[task.id]["solutions"]["indexes"]: index_two += 1 - task_solution_pairs[task.id]['solutions']['indexes'].append(index_two) - task_solution_pairs[task.id]['solutions']['ids'].append(solution.id) + task_solution_pairs[task.id]["solutions"]["indexes"].append(index_two) + task_solution_pairs[task.id]["solutions"]["ids"].append(solution.id) - temp_solution_data = { - 'index': index_two - } + temp_solution_data = {"index": index_two} for field in SOLUTION_FIELDS: if hasattr(solution, field): data = getattr(solution, field, "") - if field == 'key': + if field == "key": data = str(data) temp_solution_data[field] = data if solution.data: - temp_dataset = getattr(solution, 'data') + temp_dataset = getattr(solution, "data") if temp_dataset: if temp_dataset.data_file: try: - temp_solution_data['path'] = f"solution-{solution.pk}.zip" - zip_file.writestr(temp_solution_data['path'], temp_dataset.data_file.read()) + temp_solution_data["path"] = ( + f"solution-{solution.pk}.zip" + ) + zip_file.writestr( + temp_solution_data["path"], + temp_dataset.data_file.read(), + ) except OSError: logger.error( f"The file field is set, but no actual" f" file was found for dataset: {temp_dataset.pk} with name {temp_dataset.name}" ) else: - logger.warning(f"Could not find data file for dataset object: {temp_dataset.pk}") + logger.warning( + f"Could not find data file for dataset object: {temp_dataset.pk}" + ) # TODO: Make sure logic here is right. Needs to be outputted as a list, but what others can we tie to? - temp_solution_data['tasks'] = [index] - yaml_data['solutions'].append(temp_solution_data) + temp_solution_data["tasks"] = [index] + yaml_data["solutions"].append(temp_solution_data) index_two += 1 # End for loop for solutions; Append tasks data - yaml_data['tasks'].append(temp_task_data) + yaml_data["tasks"].append(temp_task_data) # -------- Competition Phases ------- - yaml_data['phases'] = [] + yaml_data["phases"] = [] for phase in comp.phases.all(): temp_phase_data = {} for field in PHASE_FIELDS: if hasattr(phase, field): - if field == 'start' or field == 'end': + if field == "start" or field == "end": temp_date = getattr(phase, field) if not temp_date: continue temp_date = temp_date.strftime("%Y-%m-%d") temp_phase_data[field] = temp_date - elif field == 'max_submissions_per_person': - temp_phase_data['max_submissions'] = getattr(phase, field) + elif field == "max_submissions_per_person": + temp_phase_data["max_submissions"] = getattr(phase, field) else: temp_phase_data[field] = getattr(phase, field, "") - task_indexes = [task_solution_pairs[task.id]['index'] for task in phase.tasks.all()] - temp_phase_data['tasks'] = task_indexes + task_indexes = [ + task_solution_pairs[task.id]["index"] for task in phase.tasks.all() + ] + temp_phase_data["tasks"] = task_indexes temp_phase_solutions = [] for task in phase.tasks.all(): - temp_phase_solutions += task_solution_pairs[task.id]['solutions']['indexes'] - temp_phase_data['solutions'] = temp_phase_solutions - yaml_data['phases'].append(temp_phase_data) - yaml_data['phases'] = sorted(yaml_data['phases'], key=lambda phase: phase['index']) + temp_phase_solutions += task_solution_pairs[task.id]["solutions"][ + "indexes" + ] + temp_phase_data["solutions"] = temp_phase_solutions + yaml_data["phases"].append(temp_phase_data) + yaml_data["phases"] = sorted( + yaml_data["phases"], key=lambda phase: phase["index"] + ) # -------- Leaderboards ------- - yaml_data['leaderboards'] = [] + yaml_data["leaderboards"] = [] # Have to grab leaderboards from phases - leaderboards = Leaderboard.objects.filter(id__in=comp.phases.all().values_list('leaderboard', flat=True)) + leaderboards = Leaderboard.objects.filter( + id__in=comp.phases.all().values_list("leaderboard", flat=True) + ) for index, leaderboard in enumerate(leaderboards): - ldb_data = { - 'index': index - } + ldb_data = {"index": index} for field in LEADERBOARD_FIELDS: if hasattr(leaderboard, field): ldb_data[field] = getattr(leaderboard, field, "") - ldb_data['columns'] = [] + ldb_data["columns"] = [] for column in leaderboard.columns.all(): col_data = {} for field in COLUMN_FIELDS: if hasattr(column, field): value = getattr(column, field, "") - if field == 'computation_indexes' and value is not None: - value = value.split(',') + if field == "computation_indexes" and value is not None: + value = value.split(",") if value is not None: col_data[field] = value - ldb_data['columns'].append(col_data) - yaml_data['leaderboards'].append(ldb_data) + ldb_data["columns"].append(col_data) + yaml_data["leaderboards"].append(ldb_data) # ------- Finalize -------- logger.info(f"YAML data to be written is: {yaml_data}") - comp_yaml = yaml.safe_dump(yaml_data, default_flow_style=False, allow_unicode=True, encoding="utf-8") + comp_yaml = yaml.safe_dump( + yaml_data, default_flow_style=False, allow_unicode=True, encoding="utf-8" + ) logger.info(f"YAML output: {comp_yaml}") zip_file.writestr("competition.yaml", comp_yaml) zip_file.close() @@ -674,8 +719,8 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): temp_dataset_bundle = Data.objects.create( created_by=comp.created_by, name=f"{comp.title} Dump #{bundle_count} Created {current_date_time}", - type='competition_bundle', - description='Automatically created competition dump', + type="competition_bundle", + description="Automatically created competition dump", # 'data_file'=, ) logger.info("Saving zip to Competition Bundle") @@ -684,61 +729,74 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): temp_comp_dump = CompetitionDump.objects.create( dataset=temp_dataset_bundle, status="Finished", - details="Competition Bundle {0} for Competition {1}".format(temp_dataset_bundle.pk, comp.pk), - competition=comp + details="Competition Bundle {0} for Competition {1}".format( + temp_dataset_bundle.pk, comp.pk + ), + competition=comp, + ) + logger.info( + f"Finished creating competition dump: {temp_comp_dump.pk} for competition: {comp.pk}" ) - logger.info(f"Finished creating competition dump: {temp_comp_dump.pk} for competition: {comp.pk}") except ObjectDoesNotExist: - logger.error("Could not find competition with pk {} to create a competition dump".format(competition_pk)) + logger.error( + "Could not find competition with pk {} to create a competition dump".format( + competition_pk + ) + ) -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def do_phase_migrations(): # Update phase statuses - previous_subquery = Phase.objects.filter( - competition=OuterRef('competition'), - end__lte=now() - ).order_by('-index').values('index')[:1] + previous_subquery = ( + Phase.objects.filter(competition=OuterRef("competition"), end__lte=now()) + .order_by("-index") + .values("index")[:1] + ) current_subquery = Phase.objects.filter( - competition=OuterRef('competition'), + competition=OuterRef("competition"), start__lte=now(), end__gt=now(), - ).values('index')[:1] + ).values("index")[:1] - next_subquery = Phase.objects.filter( - competition=OuterRef('competition'), - start__gt=now() - ).order_by('index').values('index')[:1] + next_subquery = ( + Phase.objects.filter(competition=OuterRef("competition"), start__gt=now()) + .order_by("index") + .values("index")[:1] + ) Phase.objects.annotate( previous_index=Subquery(previous_subquery), current_index=Subquery(current_subquery), next_index=Subquery(next_subquery), - ).update(status=Case( - When(index=F('previous_index'), then=Value(Phase.PREVIOUS)), - When(index=F('current_index'), then=Value(Phase.CURRENT)), - When(index=F('next_index'), then=Value(Phase.NEXT)), - default=None - )) + ).update( + status=Case( + When(index=F("previous_index"), then=Value(Phase.PREVIOUS)), + When(index=F("current_index"), then=Value(Phase.CURRENT)), + When(index=F("next_index"), then=Value(Phase.NEXT)), + default=None, + ) + ) # Updating Competitions whose phases have finished migrating to `is_migrating=False` - completed_statuses = [Submission.FINISHED, Submission.FAILED, Submission.CANCELLED, Submission.NONE] - - running_subs_query = Submission.objects.filter( - created_by_migration=OuterRef('pk') - ).exclude( - status__in=completed_statuses - ).values_list('pk')[:1] + completed_statuses = [ + Submission.FINISHED, + Submission.FAILED, + Submission.CANCELLED, + Submission.NONE, + ] + + running_subs_query = ( + Submission.objects.filter(created_by_migration=OuterRef("pk")) + .exclude(status__in=completed_statuses) + .values_list("pk")[:1] + ) Competition.objects.filter( - pk__in=Phase.objects.annotate( - running_subs=Count(Subquery(running_subs_query)) - ).filter( - running_subs=0, - competition__is_migrating=True, - status=Phase.PREVIOUS - ).values_list('competition__pk', flat=True) + pk__in=Phase.objects.annotate(running_subs=Count(Subquery(running_subs_query))) + .filter(running_subs=0, competition__is_migrating=True, status=Phase.PREVIOUS) + .values_list("competition__pk", flat=True) ).update(is_migrating=False) # Checking for new phases to start migrating @@ -746,7 +804,7 @@ def do_phase_migrations(): auto_migrate_to_this_phase=True, start__lte=now(), competition__is_migrating=False, - has_been_migrated=False + has_been_migrated=False, ) logger.info(f"Checking {len(new_phases)} phases for phase migrations.") @@ -755,67 +813,71 @@ def do_phase_migrations(): p.check_future_phase_submissions() -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def manual_migration(phase_id): try: source_phase = Phase.objects.get(id=phase_id) except Phase.DoesNotExist: - logger.error(f'Could not manually migrate phase with id: {phase_id}. Phase could not be found.') + logger.error( + f"Could not manually migrate phase with id: {phase_id}. Phase could not be found." + ) return try: - destination_phase = source_phase.competition.phases.get(index=source_phase.index + 1) + destination_phase = source_phase.competition.phases.get( + index=source_phase.index + 1 + ) except Phase.DoesNotExist: - logger.error(f'Could not manually migrate phase with id: {phase_id}. The next phase could not be found.') + logger.error( + f"Could not manually migrate phase with id: {phase_id}. The next phase could not be found." + ) return - destination_phase.competition.apply_phase_migration(source_phase, destination_phase, force_migration=True) + destination_phase.competition.apply_phase_migration( + source_phase, destination_phase, force_migration=True + ) -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def batch_send_email(comp_id, content): try: - competition = Competition.objects.prefetch_related('participants__user').get(id=comp_id) + competition = Competition.objects.prefetch_related("participants__user").get( + id=comp_id + ) except Competition.DoesNotExist: - logger.error(f'Not sending emails because competition with id {comp_id} could not be found') + logger.error( + f"Not sending emails because competition with id {comp_id} could not be found" + ) return codalab_send_markdown_email( - subject=f'A message from the admins of {competition.title}', + subject=f"A message from the admins of {competition.title}", markdown_content=content, - recipient_list=[participant.user.email for participant in competition.participants.all()] + recipient_list=[ + participant.user.email for participant in competition.participants.all() + ], ) -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def update_phase_statuses(): - competitions = Competition.objects.exclude(phases__in=Phase.objects.filter(is_final_phase=True, end__lt=now())) + competitions = Competition.objects.exclude( + phases__in=Phase.objects.filter(is_final_phase=True, end__lt=now()) + ) for comp in competitions: comp.update_phase_statuses() -@app.task(queue='site-worker') +@app.task(queue="site-worker") def submission_status_cleanup(): - submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent') + submissions = Submission.objects.filter( + status=Submission.RUNNING, has_children=False + ).select_related("phase", "parent") for sub in submissions: - # Check if the submission has been running for 24 hours longer than execution_time_limit - if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit): + if sub.started_when < now() - timedelta( + milliseconds=(3600000 * 24) + sub.phase.execution_time_limit + ): if sub.parent is not None: sub.parent.cancel(status=Submission.FAILED) else: sub.cancel(status=Submission.FAILED) - - -# ------------------------------------------------- -def _broadcast_worker_state(payload): - channel_layer = get_channel_layer() - if not channel_layer: - return - - async_to_sync(channel_layer.group_send)( - "compute_workers", - { - "type": "worker.health", - "worker": payload, - }, - ) diff --git a/src/celery_config.py b/src/celery_config.py index e8ba5961e..5a9c055bb 100644 --- a/src/celery_config.py +++ b/src/celery_config.py @@ -42,3 +42,4 @@ def app_for_vhost(vhost): vhost_app.conf.task_queues = app.conf.task_queues _vhost_apps[vhost] = vhost_app return _vhost_apps[vhost] + diff --git a/src/settings/base.py b/src/settings/base.py index 47c76ad6b..d2698bb2a 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -276,10 +276,6 @@ 'task': 'profiles.tasks.clean_non_activated_users', 'schedule': timedelta(days=1), # Run every 24 hours }, - "refresh_compute_worker_health": { - "task": "competitions.tasks.refresh_compute_worker_health", - "schedule": 60, - }, } CELERY_TIMEZONE = 'UTC' CELERY_WORKER_PREFETCH_MULTIPLIER = 1 diff --git a/src/static/riot/competitions/detail/_header.tag b/src/static/riot/competitions/detail/_header.tag index f572fa959..b8234bb1d 100644 --- a/src/static/riot/competitions/detail/_header.tag +++ b/src/static/riot/competitions/detail/_header.tag @@ -36,13 +36,11 @@ Migrate -
| Worker | +Competition | +Queue | Status | Jobs | Last seen | @@ -136,13 +134,10 @@ self.wsReconnectTimer = null self.wsState = 'disconnected' self.lastSyncAt = null - self.showDefaultWorkers = true self.canViewWorkersPanel = String(self.opts.can_view_workers_panel || 'false') === 'true' - self.competitionId = parseInt(self.opts.competition_id) || null - self.showWorkersPanel = false self.panelLeft = 24 self.panelTop = 24 @@ -154,7 +149,6 @@ self.toggleWorkersPanel = function () { self.showWorkersPanel = !self.showWorkersPanel - // here, we make sure the network is not used when the pannel is closed if (self.showWorkersPanel) { self.loadPanelPosition() self.connect_workers_socket() @@ -188,7 +182,6 @@ } catch (e) {} self.ws = null } - var competitionId = parseInt(self.opts.competition_id) || null var scheme = window.location.protocol === 'https:' ? 'wss' : 'ws' var url = scheme + '://' + window.location.host + '/ws/workers/' @@ -199,12 +192,16 @@ self.ws.onopen = function () { self.wsState = 'connected' - if (self.competitionId) { - var msg = JSON.stringify({ type: 'subscribe', competition_id: self.competitionId }) - self.ws.send(msg) + var competitionId = parseInt(self.opts.competition_id) || null + if (competitionId) { + self.ws.send(JSON.stringify({ + type: 'subscribe', + competition_id: competitionId + })) } self.update() } + self.ws.onmessage = function (event) { var message = null @@ -375,11 +372,6 @@ return 'red' } - self.toggleDefaultWorkers = function () { - self.showDefaultWorkers = !self.showDefaultWorkers - self.update() - } - self.getStatusIcon = function (worker) { if (self.displayStatus(worker) === 'available') return 'check circle' if (self.displayStatus(worker) === 'busy') return 'clock' diff --git a/src/utils/consumers.py b/src/utils/consumers.py index 6d076bac9..003787870 100644 --- a/src/utils/consumers.py +++ b/src/utils/consumers.py @@ -1,59 +1,22 @@ +# utils/consumers.py + import asyncio -import json import logging -import time - -from competitions.models import Competition from asgiref.sync import sync_to_async from channels.generic.websocket import AsyncJsonWebsocketConsumer -from django_redis import get_redis_connection - -from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY - -logger = logging.getLogger(__name__) -r = get_redis_connection("default") +from utils.worker_utils import fetch_compute_workers -def _load_snapshot(competition_queue_name=None): - """ - Charge les workers depuis Redis. - - workers par défaut : toujours inclus (queue_source == 'default') - - workers privés : inclus uniquement si leur queue_source correspond - à la queue de la compétition courante - """ - raw = r.hgetall(WORKERS_REGISTRY_KEY) - workers = [] - private_workers = [] - now = time.time() - - for _, value in raw.items(): - try: - worker = json.loads(value) - except Exception: - continue - - if now - worker.get("last_seen", 0) > WORKER_HEARTBEAT_TTL: - continue - - if worker.get("queue_source") == "default": - workers.append(worker) - else: - # Worker privé : n'afficher que si la queue correspond à la compétition - if competition_queue_name and worker.get("queue_source") == competition_queue_name: - private_workers.append(worker) - - workers.sort(key=lambda x: x.get("hostname", "")) - private_workers.sort(key=lambda x: (x.get("queue_source", ""), x.get("hostname", ""))) - return workers, private_workers +logger = logging.getLogger(__name__) def _get_competition_queue_name(competition_id): - """Retourne le nom de la queue de la compétition, ou None.""" if not competition_id: return None try: + from competitions.models import Competition competition = Competition.objects.select_related("queue").get(pk=competition_id) if competition.queue and competition.queue.name: return competition.queue.name @@ -62,6 +25,19 @@ def _get_competition_queue_name(competition_id): return None +def _load_snapshot(competition_queue_name=None): + workers, private_workers = fetch_compute_workers() + + if competition_queue_name: + private_workers = [ + w for w in private_workers + if w.get("queue_source") == competition_queue_name + ] + else: + private_workers = [] + + return workers, private_workers + class ComputeWorkersConsumer(AsyncJsonWebsocketConsumer): async def connect(self): @@ -88,11 +64,11 @@ async def disconnect(self, close_code): pass async def receive_json(self, content): - logger.debug("WebSocket received: %s", content) if content.get("type") == "subscribe": competition_id = content.get("competition_id") - self._competition_queue_name = await sync_to_async(_get_competition_queue_name)( - competition_id) + self._competition_queue_name = await sync_to_async( + _get_competition_queue_name + )(competition_id) self._subscribed.set() async def _push_workers_loop(self): @@ -100,7 +76,9 @@ async def _push_workers_loop(self): try: await asyncio.wait_for(self._subscribed.wait(), timeout=5.0) except asyncio.TimeoutError: - logger.warning("WebSocket subscribe timeout, proceeding without competition filter") + logger.warning( + "WebSocket subscribe timeout, proceeding without competition filter" + ) while self._running: workers, private_workers = await sync_to_async(_load_snapshot)( @@ -118,25 +96,4 @@ async def _push_workers_loop(self): break await asyncio.sleep(3) except asyncio.CancelledError: - pass - - async def worker_health(self, event): - worker = event["worker"] - is_default = worker.get("queue_source") == "default" - is_mine = ( - self._competition_queue_name is not None - and worker.get("queue_source") == self._competition_queue_name - ) - if not is_default and not is_mine: - return - try: - workers, private_workers = await sync_to_async(_load_snapshot)( - self._competition_queue_name - ) - await self.send_json({ - "type": "workers.snapshot", - "workers": workers, - "private_workers": private_workers, - }) - except RuntimeError: - pass + pass \ No newline at end of file diff --git a/src/utils/worker_utils.py b/src/utils/worker_utils.py index f1b9a474b..ef48b0e55 100644 --- a/src/utils/worker_utils.py +++ b/src/utils/worker_utils.py @@ -1,28 +1,111 @@ +# utils/worker_utils.py + +import logging +import time + +import requests +from django.conf import settings from queues.models import Queue -WORKERS_REGISTRY_KEY = "workers:registry" -WORKER_HEARTBEAT_TTL = 180 +logger = logging.getLogger(__name__) +PIDBOX_SUFFIX = ".celery.pidbox" -def extract_queue_names(active_queues): - names = set() - for q in active_queues or []: - if isinstance(q, dict) and q.get("name"): - names.add(q["name"]) - return names +def _rabbitmq_auth(): + return (settings.RABBITMQ_DEFAULT_USER, settings.RABBITMQ_DEFAULT_PASS) -def known_compute_queue_names(): - return set( - Queue.objects.exclude(name__isnull=True) - .exclude(name="") - .values_list("name", flat=True) - ) + +def _rabbitmq_base_url(): + return f"http://{settings.RABBITMQ_HOST}:{settings.RABBITMQ_MANAGEMENT_PORT}/api" + + +def _build_vhost_to_source_map(): + """{ vhost_string: queue_source_name }. Default vhost '/' → 'default'.""" + mapping = {"/": "default"} + for q in Queue.objects.exclude(vhost__isnull=True).values("vhost", "name"): + mapping[str(q["vhost"])] = q["name"] + return mapping -def is_compute_worker(worker_name, queue_names, known_queue_names): - return ( - bool(queue_names & known_queue_names) - or "compute-worker" in queue_names - or worker_name.startswith("compute-worker") +def _fetch_all_queues(): + resp = requests.get( + f"{_rabbitmq_base_url()}/queues", + auth=_rabbitmq_auth(), + timeout=5, ) + resp.raise_for_status() + return resp.json() + + +def is_compute_worker(worker_name): + return worker_name.startswith("compute-worker") + + +def fetch_compute_workers(): + try: + all_queues = _fetch_all_queues() + except Exception: + logger.exception("Failed to fetch queues from RabbitMQ Management API") + return [], [] + + try: + vhost_to_source = _build_vhost_to_source_map() + except Exception: + logger.exception("Failed to build vhost→source map") + return [], [] + + # Grouper par vhost + by_vhost: dict[str, list] = {} + for q in all_queues: + by_vhost.setdefault(q["vhost"], []).append(q) + + workers = [] + private_workers = [] + now = time.time() + + for vhost, queues in by_vhost.items(): + source_name = vhost_to_source.get(vhost) + if not source_name: + continue + + cw_queue = next((q for q in queues if q["name"] == "compute-worker"), None) + messages_unacked = cw_queue.get("messages_unacknowledged", 0) if cw_queue else 0 + cw_consumers = cw_queue.get("consumers", 0) if cw_queue else 0 + + # Pidbox queues 1 worker / queue + for pidbox_q in queues: + name = pidbox_q["name"] + if not (name.endswith(PIDBOX_SUFFIX) and name.startswith("compute-worker@")): + continue + + hostname = name[: -len(PIDBOX_SUFFIX)] + if not is_compute_worker(hostname): + continue + + pidbox_alive = pidbox_q.get("consumers", 0) > 0 + + if not pidbox_alive or cw_consumers == 0: + status, running_jobs = "unavailable", 0 + elif messages_unacked > 0: + status, running_jobs = "busy", messages_unacked + else: + status, running_jobs = "available", 0 + + worker = { + "hostname": hostname, + "status": status, + "running_jobs": running_jobs, + "last_seen": now, + "queue_source": source_name, + "queue_names": ["compute-worker"], + } + + if source_name == "default": + workers.append(worker) + else: + private_workers.append(worker) + + workers.sort(key=lambda x: x["hostname"]) + private_workers.sort(key=lambda x: (x["queue_source"], x["hostname"])) + return workers, private_workers \ No newline at end of file
|---|