Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ea8c082
feature ok / needs to be tested
IdirLISN Mar 31, 2026
3052ac6
feature deactivated by default + hiden behind a button by default + v…
IdirLISN Apr 2, 2026
115f74c
worker monitoring button behind user menu
IdirLISN Apr 2, 2026
15ef892
files blacked for fixing the formatting issues
IdirLISN Apr 2, 2026
bb80f24
fixing synthax and format
IdirLISN Apr 2, 2026
84b8922
rebase on dev
IdirLISN May 27, 2026
bda93c1
clean feature
IdirLISN May 27, 2026
923c16c
git rebase continue
IdirLISN May 27, 2026
f92891f
feature in progress
IdirLISN Apr 7, 2026
9be3ec9
git rebase continue
IdirLISN May 27, 2026
3231a2a
git rebase continue
IdirLISN May 27, 2026
8fae6a5
compute worker monitoring on private queues (amazing stuff)
IdirLISN May 7, 2026
4b870fc
test number 245
IdirLISN May 7, 2026
7a8dd0b
git rebase continue
IdirLISN May 27, 2026
de9a401
rebase and fix incoming
IdirLISN May 27, 2026
99347b2
rebase and fix incoming
IdirLISN May 27, 2026
b4fd5a6
feature clean
IdirLISN May 27, 2026
88ac295
conflicts solved
IdirLISN May 28, 2026
a49cf13
conflicts solved
IdirLISN May 28, 2026
f43d5fd
private CW pb solved
IdirLISN May 28, 2026
e430db5
linter fix
IdirLISN May 28, 2026
2f7d290
remove comment
IdirLISN May 28, 2026
f08280f
feature en cours
IdirLISN Jun 1, 2026
2aa19aa
monitor queues feature for admin ok
IdirLISN Jun 2, 2026
2ee6d3b
UX/UI improved
IdirLISN Jun 2, 2026
9f0e970
UI/UX improvment
IdirLISN Jun 4, 2026
c15117e
final push
IdirLISN Jun 4, 2026
ddca9b4
adding colors (final push)
IdirLISN Jun 4, 2026
3f8033c
panel resize
IdirLISN Jun 4, 2026
1c42616
UI cleaning
IdirLISN Jun 4, 2026
6dd6214
save panel state
IdirLISN Jun 4, 2026
c440a67
revert details.tag before CW monitoring merge
IdirLISN Jun 4, 2026
4d77c5f
worker jobs fix first try
IdirLISN Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
579 changes: 319 additions & 260 deletions src/apps/competitions/tasks.py

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions src/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,6 @@
'task': 'profiles.tasks.clean_non_activated_users',
'schedule': timedelta(days=1), # Run every 24 hours
},
"refresh_compute_worker_health": {
"task": "competitions.tasks.refresh_compute_worker_health",
"schedule": 60,
},
}
CELERY_TIMEZONE = 'UTC'
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Expand Down
2 changes: 0 additions & 2 deletions src/static/riot/competitions/detail/_header.tag
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
Migrate
</button>

<!--
<worker-monitor-toggle
if="{competition.admin}"
can_view_workers_panel="true"
competition_id="{ competition.id }">
</worker-monitor-toggle>
-->

</div>
<div class="row">
Expand Down
679 changes: 48 additions & 631 deletions src/static/riot/competitions/detail/detail.tag

Large diffs are not rendered by default.

1,038 changes: 821 additions & 217 deletions src/static/riot/competitions/detail/worker-monitor-toggle.tag

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions src/templates/pages/monitor_queues.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,21 @@ <h1>Monitor queues</h1>
</div>
</div>
{% endif %}

{% if user.is_superuser %}
<div class="ui container">

<div class="ui segment">
<worker-monitor-toggle
can_view_workers_panel="true"
all_workers="true"
inline_mode="true">
</worker-monitor-toggle>
</div>

<div id="external_monitors" class="ui two column grid">
</div>
</div>
{% endif %}

{% endblock %}
98 changes: 29 additions & 69 deletions src/utils/consumers.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,19 @@
import asyncio
import json
import logging
import time

from competitions.models import Competition

from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django_redis import get_redis_connection

from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY
from utils.worker_utils import fetch_compute_workers

logger = logging.getLogger(__name__)

r = get_redis_connection("default")


def _load_snapshot(competition_queue_name=None):
"""
Charge les workers depuis Redis.
- workers par défaut : toujours inclus (queue_source == 'default')
- workers privés : inclus uniquement si leur queue_source correspond
à la queue de la compétition courante
"""
raw = r.hgetall(WORKERS_REGISTRY_KEY)
workers = []
private_workers = []
now = time.time()

for _, value in raw.items():
try:
worker = json.loads(value)
except Exception:
continue

if now - worker.get("last_seen", 0) > WORKER_HEARTBEAT_TTL:
continue

if worker.get("queue_source") == "default":
workers.append(worker)
else:
# Worker privé : n'afficher que si la queue correspond à la compétition
if competition_queue_name and worker.get("queue_source") == competition_queue_name:
private_workers.append(worker)

workers.sort(key=lambda x: x.get("hostname", ""))
private_workers.sort(key=lambda x: (x.get("queue_source", ""), x.get("hostname", "")))
return workers, private_workers


def _get_competition_queue_name(competition_id):
"""Retourne le nom de la queue de la compétition, ou None."""
if not competition_id:
return None
try:
from competitions.models import Competition
competition = Competition.objects.select_related("queue").get(pk=competition_id)
if competition.queue and competition.queue.name:
return competition.queue.name
Expand All @@ -62,6 +22,22 @@ def _get_competition_queue_name(competition_id):
return None


def _load_snapshot(competition_queue_name=None, show_all=False):
workers, private_workers = fetch_compute_workers()

if show_all:
pass # tous les workers privés visibles
elif competition_queue_name:
private_workers = [
w for w in private_workers
if w.get("queue_source") == competition_queue_name
]
else:
private_workers = []

return workers, private_workers


class ComputeWorkersConsumer(AsyncJsonWebsocketConsumer):

async def connect(self):
Expand All @@ -72,6 +48,7 @@ async def connect(self):
await self.accept()
await self.channel_layer.group_add("compute_workers", self.channel_name)
self._competition_queue_name = None
self._show_all = False
self._running = True
self._subscribed = asyncio.Event()
self._task = asyncio.create_task(self._push_workers_loop())
Expand All @@ -88,23 +65,27 @@ async def disconnect(self, close_code):
pass

async def receive_json(self, content):
logger.debug("WebSocket received: %s", content)
if content.get("type") == "subscribe":
competition_id = content.get("competition_id")
self._competition_queue_name = await sync_to_async(_get_competition_queue_name)(
competition_id)
if content.get("all_workers"):
self._show_all = True
else:
competition_id = content.get("competition_id")
self._competition_queue_name = await sync_to_async(
_get_competition_queue_name
)(competition_id)
self._subscribed.set()

async def _push_workers_loop(self):
try:
try:
await asyncio.wait_for(self._subscribed.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("WebSocket subscribe timeout, proceeding without competition filter")
logger.warning("WebSocket subscribe timeout, proceeding without filter")

while self._running:
workers, private_workers = await sync_to_async(_load_snapshot)(
self._competition_queue_name
competition_queue_name=self._competition_queue_name,
show_all=self._show_all,
)
if not self._running:
break
Expand All @@ -119,24 +100,3 @@ async def _push_workers_loop(self):
await asyncio.sleep(3)
except asyncio.CancelledError:
pass

async def worker_health(self, event):
worker = event["worker"]
is_default = worker.get("queue_source") == "default"
is_mine = (
self._competition_queue_name is not None
and worker.get("queue_source") == self._competition_queue_name
)
if not is_default and not is_mine:
return
try:
workers, private_workers = await sync_to_async(_load_snapshot)(
self._competition_queue_name
)
await self.send_json({
"type": "workers.snapshot",
"workers": workers,
"private_workers": private_workers,
})
except RuntimeError:
pass
130 changes: 111 additions & 19 deletions src/utils/worker_utils.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,120 @@
import logging
import time

import requests
from django.conf import settings
from queues.models import Queue

WORKERS_REGISTRY_KEY = "workers:registry"
WORKER_HEARTBEAT_TTL = 180
logger = logging.getLogger(__name__)

PIDBOX_SUFFIX = ".celery.pidbox"

def extract_queue_names(active_queues):
names = set()
for q in active_queues or []:
if isinstance(q, dict) and q.get("name"):
names.add(q["name"])
return names

def _rabbitmq_auth():
return (settings.RABBITMQ_DEFAULT_USER, settings.RABBITMQ_DEFAULT_PASS)

def known_compute_queue_names():
return set(
Queue.objects.exclude(name__isnull=True)
.exclude(name="")
.values_list("name", flat=True)
)

def _rabbitmq_base_url():
return f"http://{settings.RABBITMQ_HOST}:{settings.RABBITMQ_MANAGEMENT_PORT}/api"


def _build_vhost_to_source_map():
"""{ vhost_string: queue_source_name }. Default vhost '/' → 'default'."""
mapping = {"/": "default"}
for q in Queue.objects.exclude(vhost__isnull=True).values("vhost", "name"):
mapping[str(q["vhost"])] = q["name"]
return mapping


def is_compute_worker(worker_name, queue_names, known_queue_names):
return (
bool(queue_names & known_queue_names)
or "compute-worker" in queue_names
or worker_name.startswith("compute-worker")
def _fetch_all_queues():
resp = requests.get(
f"{_rabbitmq_base_url()}/queues",
auth=_rabbitmq_auth(),
timeout=5,
)
resp.raise_for_status()
return resp.json()


def is_compute_worker(worker_name):
return worker_name.startswith("compute-worker")


def fetch_compute_workers():
try:
all_queues = _fetch_all_queues()
except Exception:
logger.exception("Failed to fetch queues from RabbitMQ Management API")
return [], []

try:
vhost_to_source = _build_vhost_to_source_map()
except Exception:
logger.exception("Failed to build vhost→source map")
return [], []

by_vhost: dict[str, list] = {}
for q in all_queues:
by_vhost.setdefault(q["vhost"], []).append(q)

workers = []
private_workers = []
now = time.time()

for vhost, queues in by_vhost.items():
source_name = vhost_to_source.get(vhost)
if not source_name:
continue

cw_queue = next((q for q in queues if q["name"] == "compute-worker"), None)
messages_unacked = cw_queue.get("messages_unacknowledged", 0) if cw_queue else 0
cw_consumers = cw_queue.get("consumers", 0) if cw_queue else 0

vhost_workers = []
for pidbox_q in queues:
name = pidbox_q["name"]
if not (
name.endswith(PIDBOX_SUFFIX) and name.startswith("compute-worker@")
):
continue
hostname = name[: -len(PIDBOX_SUFFIX)]
if not is_compute_worker(hostname):
continue
vhost_workers.append(
{
"hostname": hostname,
"pidbox_alive": pidbox_q.get("consumers", 0) > 0,
}
)

jobs_to_distribute = messages_unacked

for w in vhost_workers:
if not w["pidbox_alive"] or cw_consumers == 0:
status = "unavailable"
running_jobs = 0
elif jobs_to_distribute > 0:
status = "busy"
running_jobs = 1
jobs_to_distribute -= 1
else:
status = "available"
running_jobs = 0

worker = {
"hostname": w["hostname"],
"status": status,
"running_jobs": running_jobs,
"last_seen": now,
"queue_source": source_name,
"queue_names": ["compute-worker"],
}

if source_name == "default":
workers.append(worker)
else:
private_workers.append(worker)

workers.sort(key=lambda x: x["hostname"])
private_workers.sort(key=lambda x: (x["queue_source"], x["hostname"]))
return workers, private_workers