From 37351fa62f9c177de2858dfa2aaa53001cd778eb Mon Sep 17 00:00:00 2001 From: Parag More Date: Tue, 30 Jun 2026 22:21:48 +0530 Subject: [PATCH] feat: merge worker-level alerts into alerts and history endpoints Read worker-level alerts from pool-worker ext data (getWrkExtData 'alerts' / 'alerts-history') and merge them into the site alerts and history-alerts responses. Worker alerts use the same flat shape as thing alerts, so they need no special-casing. Workers opt in via ALERT_EXT_DATA_WORKER_TYPES. --- tests/unit/handlers/alerts.handlers.test.js | 76 ++++++++++++++++++- workers/lib/constants.js | 3 + .../lib/server/handlers/alerts.handlers.js | 25 +++++- 3 files changed, 99 insertions(+), 5 deletions(-) diff --git a/tests/unit/handlers/alerts.handlers.test.js b/tests/unit/handlers/alerts.handlers.test.js index 9a05c1d..5d22158 100644 --- a/tests/unit/handlers/alerts.handlers.test.js +++ b/tests/unit/handlers/alerts.handlers.test.js @@ -833,8 +833,10 @@ test('getSiteAlerts - type combines with existing filter (AND)', async (t) => { test('getSiteAlerts - type pushes thing.type constraint to the worker query', async (t) => { let captured - const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, _method, params) => { - captured = params + const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method, params) => { + // getSiteAlerts also reads worker ext data via getWrkExtData; only capture + // the listThings call this assertion is about. + if (method === 'listThings') captured = params return typedThings() }) await getSiteAlerts(mockCtx, { query: { type: 'operational' } }) @@ -871,10 +873,76 @@ test('getAlertsHistory - type=all returns everything', async (t) => { test('getAlertsHistory - type pushes thing.type constraint to the worker query', async (t) => { let captured - const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, _method, params) => { - captured = params + const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method, params) => { + // getAlertsHistory also reads worker ext data via getWrkExtData; only capture + // the getHistoricalLogs call this assertion is about. + if (method === 'getHistoricalLogs') captured = params return typedHistory() }) await getAlertsHistory(mockCtx, { query: { start: 1, end: 9000, type: 'miner' } }) t.alike(captured.query, { 'thing.type': { $regex: '^miner(-|$)' } }, 'miner constraint pushed to getHistoricalLogs') }) + +// ==================== Worker-level alert merge (ext data) ==================== + +const oceanExtAlert = (createdAt = 1000, uuid = 'datum-uuid-1') => ({ + name: 'Datum_Offline', + code: 'ocean', + description: 'DATUM gateway is offline', + severity: 'critical', + createdAt, + uuid, + id: 'minerpool-ocean', + deviceId: 'minerpool-ocean', + type: 'minerpool', + container: null, + position: null +}) + +test('getSiteAlerts - merges worker-level alerts from ext data', async (t) => { + const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => { + if (method === 'listThings') { + return [{ id: 'm1', type: 'miner', code: 'S19', info: {}, last: { alerts: [{ severity: 'high', name: 'fan' }] } }] + } + if (method === 'getWrkExtData') return [{ ts: 1000, alerts: [oceanExtAlert()] }] + return [] + }) + const result = await getSiteAlerts(mockCtx, { query: {} }) + t.ok(result.alerts.some(a => a.name === 'Datum_Offline'), 'worker alert merged into site alerts') + t.ok(result.alerts.some(a => a.name === 'fan'), 'thing alert still present') + t.is(result.summary.critical, 1, 'critical worker alert counted in summary') + t.is(result.summary.high, 1, 'thing alert still counted') +}) + +test('getSiteAlerts - worker alerts respect the type filter (minerpool is operational)', async (t) => { + const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => { + if (method === 'getWrkExtData') return [{ ts: 1000, alerts: [oceanExtAlert()] }] + return [] + }) + const operational = await getSiteAlerts(mockCtx, { query: { type: 'operational' } }) + t.ok(operational.alerts.some(a => a.name === 'Datum_Offline'), 'kept under operational') + const miner = await getSiteAlerts(mockCtx, { query: { type: 'miner' } }) + t.absent(miner.alerts.find(a => a.name === 'Datum_Offline'), 'excluded under miner') +}) + +test('getAlertsHistory - merges worker-level alert history from ext data', async (t) => { + const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => { + if (method === 'getHistoricalLogs') return [makeHistoryAlert('m1', 1000, 'high', { type: 'miner' })] + if (method === 'getWrkExtData') return [{ ts: 5000, alerts: [oceanExtAlert(5000, 'datum-uuid-2')] }] + return [] + }) + const result = await getAlertsHistory(mockCtx, { query: { start: 1, end: 9000 } }) + t.ok(result.alerts.some(a => a.name === 'Datum_Offline'), 'worker history alert merged') + t.ok(result.alerts.some(a => a.type === 'miner'), 'thing history alert still present') +}) + +test('getAlertsHistory - dedupes repeated worker alerts by uuid', async (t) => { + const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => { + if (method === 'getHistoricalLogs') return [] + // same alert reported in two buckets (same uuid) + if (method === 'getWrkExtData') return [{ ts: 5000, alerts: [oceanExtAlert(5000, 'dup')] }, { ts: 6000, alerts: [oceanExtAlert(5000, 'dup')] }] + return [] + }) + const result = await getAlertsHistory(mockCtx, { query: { start: 1, end: 9000 } }) + t.is(result.alerts.filter(a => a.uuid === 'dup').length, 1, 'duplicate uuid collapsed to one') +}) diff --git a/workers/lib/constants.js b/workers/lib/constants.js index 82c9f14..6551bd0 100644 --- a/workers/lib/constants.js +++ b/workers/lib/constants.js @@ -352,6 +352,8 @@ const ALERTS_FILTER_OPERATORS = ['$eq', '$ne', '$in', '$nin', '$gt', '$gte', '$l const ALERT_TYPE_CATEGORIES = ['all', 'operational', 'miner'] +const ALERT_EXT_DATA_WORKER_TYPES = [WORKER_TYPES.MINERPOOL] + // Matches the miner base type and its subtypes (e.g. 'miner-am-s19xp'), not 'minerals'. const MINER_TYPE_REGEX = '^miner(-|$)' @@ -844,6 +846,7 @@ module.exports = { HISTORY_SEARCH_FIELDS, ALERTS_FILTER_OPERATORS, ALERT_TYPE_CATEGORIES, + ALERT_EXT_DATA_WORKER_TYPES, MINER_TYPE_REGEX, SITE_ALERTS_THING_QUERY_MAP, HISTORY_ALERTS_QUERY_MAP, diff --git a/workers/lib/server/handlers/alerts.handlers.js b/workers/lib/server/handlers/alerts.handlers.js index 77db375..042d66e 100644 --- a/workers/lib/server/handlers/alerts.handlers.js +++ b/workers/lib/server/handlers/alerts.handlers.js @@ -13,7 +13,8 @@ const { ALERTS_FILTER_OPERATORS, MINER_TYPE_REGEX, SITE_ALERTS_THING_QUERY_MAP, - HISTORY_ALERTS_QUERY_MAP + HISTORY_ALERTS_QUERY_MAP, + ALERT_EXT_DATA_WORKER_TYPES } = require('../../constants') const { parseJsonQueryParam, validateFilter, applyMongoFilter, combineAnd, deduplicateAlerts } = require('../../utils') @@ -39,6 +40,23 @@ function extractAlertsFromThings (things) { return alerts } +// Worker alerts use the same flat shape as thing alerts, so they merge as-is. +async function fetchWorkerExtAlerts (ctx, query) { + const alerts = [] + for (const type of ALERT_EXT_DATA_WORKER_TYPES) { + let results + try { + results = await ctx.dataProxy.requestDataMap(RPC_METHODS.GET_WRK_EXT_DATA, { type, query }) + } catch { + continue + } + for (const entry of results.flat()) { + if (Array.isArray(entry?.alerts)) alerts.push(...entry.alerts) + } + } + return alerts +} + function matchesSearch (item, search, fields) { if (!search) return true const lowerSearch = search.toLowerCase() @@ -155,6 +173,8 @@ async function getSiteAlerts (ctx, req) { const things = results.flat() let alerts = extractAlertsFromThings(things) + alerts = alerts.concat(await fetchWorkerExtAlerts(ctx, { key: 'alerts' })) + // Re-apply on the merged result for per-alert fields and multi-rack correctness. alerts = applyMongoFilter(alerts, combineAnd(filter, typeFilter)) alerts = alerts.filter(a => matchesSearch(a, search, SITE_ALERTS_SEARCH_FIELDS)) @@ -197,6 +217,9 @@ async function getAlertsHistory (ctx, req) { }) let alerts = results.flat().map(flattenHistoryAlert) + + // Worker history entries are already flat, so concat directly (no flatten). + alerts = alerts.concat(await fetchWorkerExtAlerts(ctx, { key: 'alerts-history', start, end })) alerts = deduplicateAlerts(alerts) // Re-apply on the merged result for global correctness.