Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions tests/unit/handlers/alerts.handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,10 @@ test('getSiteAlerts - type combines with existing filter (AND)', async (t) => {

test('getSiteAlerts - type pushes thing.type constraint to the worker query', async (t) => {
let captured
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, _method, params) => {
captured = params
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method, params) => {
// getSiteAlerts also reads worker ext data via getWrkExtData; only capture
// the listThings call this assertion is about.
if (method === 'listThings') captured = params
return typedThings()
})
await getSiteAlerts(mockCtx, { query: { type: 'operational' } })
Expand Down Expand Up @@ -871,10 +873,76 @@ test('getAlertsHistory - type=all returns everything', async (t) => {

test('getAlertsHistory - type pushes thing.type constraint to the worker query', async (t) => {
let captured
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, _method, params) => {
captured = params
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method, params) => {
// getAlertsHistory also reads worker ext data via getWrkExtData; only capture
// the getHistoricalLogs call this assertion is about.
if (method === 'getHistoricalLogs') captured = params
return typedHistory()
})
await getAlertsHistory(mockCtx, { query: { start: 1, end: 9000, type: 'miner' } })
t.alike(captured.query, { 'thing.type': { $regex: '^miner(-|$)' } }, 'miner constraint pushed to getHistoricalLogs')
})

// ==================== Worker-level alert merge (ext data) ====================

const oceanExtAlert = (createdAt = 1000, uuid = 'datum-uuid-1') => ({
name: 'Datum_Offline',
code: 'ocean',
description: 'DATUM gateway is offline',
severity: 'critical',
createdAt,
uuid,
id: 'minerpool-ocean',
deviceId: 'minerpool-ocean',
type: 'minerpool',
container: null,
position: null
})

test('getSiteAlerts - merges worker-level alerts from ext data', async (t) => {
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => {
if (method === 'listThings') {
return [{ id: 'm1', type: 'miner', code: 'S19', info: {}, last: { alerts: [{ severity: 'high', name: 'fan' }] } }]
}
if (method === 'getWrkExtData') return [{ ts: 1000, alerts: [oceanExtAlert()] }]
return []
})
const result = await getSiteAlerts(mockCtx, { query: {} })
t.ok(result.alerts.some(a => a.name === 'Datum_Offline'), 'worker alert merged into site alerts')
t.ok(result.alerts.some(a => a.name === 'fan'), 'thing alert still present')
t.is(result.summary.critical, 1, 'critical worker alert counted in summary')
t.is(result.summary.high, 1, 'thing alert still counted')
})

test('getSiteAlerts - worker alerts respect the type filter (minerpool is operational)', async (t) => {
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => {
if (method === 'getWrkExtData') return [{ ts: 1000, alerts: [oceanExtAlert()] }]
return []
})
const operational = await getSiteAlerts(mockCtx, { query: { type: 'operational' } })
t.ok(operational.alerts.some(a => a.name === 'Datum_Offline'), 'kept under operational')
const miner = await getSiteAlerts(mockCtx, { query: { type: 'miner' } })
t.absent(miner.alerts.find(a => a.name === 'Datum_Offline'), 'excluded under miner')
})

test('getAlertsHistory - merges worker-level alert history from ext data', async (t) => {
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => {
if (method === 'getHistoricalLogs') return [makeHistoryAlert('m1', 1000, 'high', { type: 'miner' })]
if (method === 'getWrkExtData') return [{ ts: 5000, alerts: [oceanExtAlert(5000, 'datum-uuid-2')] }]
return []
})
const result = await getAlertsHistory(mockCtx, { query: { start: 1, end: 9000 } })
t.ok(result.alerts.some(a => a.name === 'Datum_Offline'), 'worker history alert merged')
t.ok(result.alerts.some(a => a.type === 'miner'), 'thing history alert still present')
})

test('getAlertsHistory - dedupes repeated worker alerts by uuid', async (t) => {
const mockCtx = createMockCtxWithOrks([{ rpcPublicKey: 'key1' }], async (_pk, method) => {
if (method === 'getHistoricalLogs') return []
// same alert reported in two buckets (same uuid)
if (method === 'getWrkExtData') return [{ ts: 5000, alerts: [oceanExtAlert(5000, 'dup')] }, { ts: 6000, alerts: [oceanExtAlert(5000, 'dup')] }]
return []
})
const result = await getAlertsHistory(mockCtx, { query: { start: 1, end: 9000 } })
t.is(result.alerts.filter(a => a.uuid === 'dup').length, 1, 'duplicate uuid collapsed to one')
})
3 changes: 3 additions & 0 deletions workers/lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ const ALERTS_FILTER_OPERATORS = ['$eq', '$ne', '$in', '$nin', '$gt', '$gte', '$l

const ALERT_TYPE_CATEGORIES = ['all', 'operational', 'miner']

const ALERT_EXT_DATA_WORKER_TYPES = [WORKER_TYPES.MINERPOOL]

// Matches the miner base type and its subtypes (e.g. 'miner-am-s19xp'), not 'minerals'.
const MINER_TYPE_REGEX = '^miner(-|$)'

Expand Down Expand Up @@ -844,6 +846,7 @@ module.exports = {
HISTORY_SEARCH_FIELDS,
ALERTS_FILTER_OPERATORS,
ALERT_TYPE_CATEGORIES,
ALERT_EXT_DATA_WORKER_TYPES,
MINER_TYPE_REGEX,
SITE_ALERTS_THING_QUERY_MAP,
HISTORY_ALERTS_QUERY_MAP,
Expand Down
25 changes: 24 additions & 1 deletion workers/lib/server/handlers/alerts.handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const {
ALERTS_FILTER_OPERATORS,
MINER_TYPE_REGEX,
SITE_ALERTS_THING_QUERY_MAP,
HISTORY_ALERTS_QUERY_MAP
HISTORY_ALERTS_QUERY_MAP,
ALERT_EXT_DATA_WORKER_TYPES
} = require('../../constants')
const { parseJsonQueryParam, validateFilter, applyMongoFilter, combineAnd, deduplicateAlerts } = require('../../utils')

Expand All @@ -39,6 +40,23 @@ function extractAlertsFromThings (things) {
return alerts
}

// Worker alerts use the same flat shape as thing alerts, so they merge as-is.
async function fetchWorkerExtAlerts (ctx, query) {
const alerts = []
for (const type of ALERT_EXT_DATA_WORKER_TYPES) {
let results
try {
results = await ctx.dataProxy.requestDataMap(RPC_METHODS.GET_WRK_EXT_DATA, { type, query })
} catch {
continue
}
for (const entry of results.flat()) {
if (Array.isArray(entry?.alerts)) alerts.push(...entry.alerts)
}
}
return alerts
}

function matchesSearch (item, search, fields) {
if (!search) return true
const lowerSearch = search.toLowerCase()
Expand Down Expand Up @@ -155,6 +173,8 @@ async function getSiteAlerts (ctx, req) {
const things = results.flat()
let alerts = extractAlertsFromThings(things)

alerts = alerts.concat(await fetchWorkerExtAlerts(ctx, { key: 'alerts' }))

// Re-apply on the merged result for per-alert fields and multi-rack correctness.
alerts = applyMongoFilter(alerts, combineAnd(filter, typeFilter))
alerts = alerts.filter(a => matchesSearch(a, search, SITE_ALERTS_SEARCH_FIELDS))
Expand Down Expand Up @@ -197,6 +217,9 @@ async function getAlertsHistory (ctx, req) {
})

let alerts = results.flat().map(flattenHistoryAlert)

// Worker history entries are already flat, so concat directly (no flatten).
alerts = alerts.concat(await fetchWorkerExtAlerts(ctx, { key: 'alerts-history', start, end }))
alerts = deduplicateAlerts(alerts)

// Re-apply on the merged result for global correctness.
Expand Down
Loading