From 5d9dfbd40d419d776f389be481d7f9c04a89e615 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 31 Mar 2026 14:08:50 +0200 Subject: [PATCH 01/20] feature ok / needs to be tested --- src/apps/competitions/tasks.py | 26 +- src/celery_config.py | 9 + .../riot/competitions/detail/detail.tag | 835 ++++++++++++------ src/utils/worker_utils.py | 109 +++ 4 files changed, 704 insertions(+), 275 deletions(-) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index eeaa2ffae..27f8e4061 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -8,14 +8,10 @@ from io import BytesIO from tempfile import TemporaryDirectory, NamedTemporaryFile -# import json -# import urllib - import oyaml as yaml import requests from celery._state import app_or_default from django.conf import settings -# from django_redis import get_redis_connection from django.core.exceptions import ObjectDoesNotExist from django.core.files.base import ContentFile from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F @@ -24,10 +20,9 @@ from django.utils.timezone import now from rest_framework.exceptions import ValidationError -from celery_config import app # , app_for_vhost +from celery_config import app from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \ CompetitionDump, Phase -# from queues.models import Queue from competitions.unpackers.utils import CompetitionUnpackingException from competitions.unpackers.v1 import V15Unpacker from competitions.unpackers.v2 import V2Unpacker @@ -36,12 +31,9 @@ from datasets.models import Data from utils.data import make_url_sassy from utils.email import codalab_send_markdown_email -from channels.layers import get_channel_layer -from asgiref.sync import async_to_sync import logging -# from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY, extract_queue_names, is_compute_worker, known_compute_queue_names logger = logging.getLogger(__name__) COMPETITION_FIELDS = [ @@ -798,24 +790,8 @@ def submission_status_cleanup(): submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent') for sub in submissions: - # Check if the submission has been running for 24 hours longer than execution_time_limit if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit): if sub.parent is not None: sub.parent.cancel(status=Submission.FAILED) else: sub.cancel(status=Submission.FAILED) - - -# ------------------------------------------------- -def _broadcast_worker_state(payload): - channel_layer = get_channel_layer() - if not channel_layer: - return - - async_to_sync(channel_layer.group_send)( - "compute_workers", - { - "type": "worker.health", - "worker": payload, - }, - ) diff --git a/src/celery_config.py b/src/celery_config.py index e8ba5961e..614052be7 100644 --- a/src/celery_config.py +++ b/src/celery_config.py @@ -42,3 +42,12 @@ def app_for_vhost(vhost): vhost_app.conf.task_queues = app.conf.task_queues _vhost_apps[vhost] = vhost_app return _vhost_apps[vhost] + + + +app.conf.beat_schedule = { + "refresh-compute-worker-health": { + "task": "chemin.vers.refresh_compute_worker_health", + "schedule": 5.0, + }, +} \ No newline at end of file diff --git a/src/static/riot/competitions/detail/detail.tag b/src/static/riot/competitions/detail/detail.tag index 7bed78b29..f1dce44fa 100644 --- a/src/static/riot/competitions/detail/detail.tag +++ b/src/static/riot/competitions/detail/detail.tag @@ -6,25 +6,13 @@ - - -