diff --git a/di/heartbeat/deps.q b/di/heartbeat/deps.q new file mode 100644 index 00000000..42185407 --- /dev/null +++ b/di/heartbeat/deps.q @@ -0,0 +1,4 @@ +/ hard module dependencies and their minimum versions, validated by di.depcheck +/ di.heartbeat has no hard dependencies - all runtime dependencies (log, timer, +/ handlers, pubsub, servers) are injected via init as dictionaries of functions +deps:(`$())!(); diff --git a/di/heartbeat/heartbeat.md b/di/heartbeat/heartbeat.md new file mode 100644 index 00000000..5708c6b6 --- /dev/null +++ b/di/heartbeat/heartbeat.md @@ -0,0 +1,130 @@ +# Heartbeat + +This module lets every process publish a periodic heartbeat over pub/sub, and lets +monitoring processes detect when a process has stopped beating - i.e. it is stalled +or blocked - even when the underlying connection is still valid. + +It covers both sides: + +* **Publishing** - a process periodically publishes a heartbeat row over pub/sub. +* **Monitoring** - a process subscribes to other processes' heartbeats, stores the + latest beat per process, and raises a *warning* then an *error* when a process + stops heartbeating within the configured grace periods. + +## Dependencies + +All runtime dependencies are **injected** via `init` as dictionaries of functions +(`` `dependency!(dict of functions) ``). They are **required** - `init` errors +immediately with a clear message if a required dependency is missing. There is no +hard dependency on any other module: any module exporting the contracted function +signatures can be supplied. + +| Dependency | Keys | Required | Purpose | +|------------|------|----------|---------| +| `log` | `info` `warn` `error` (each `{[ctx;msg]}`) | always | logging | +| `timer` | `addjob` (the full `di.timer` dict may be passed) | always | scheduling the publish / check / subscribe jobs | +| `pubsub` | `publish` (`{[table;data]}`) `subscribe` (`{[handle]}`) | always | publishing heartbeats / subscribing to publishers | +| `servers` | `getservers` (`{[proctype]}` returning handles) | when `subenabled` | discovering heartbeat publishers by process type | +| `handlers` | `register` `remove` `list` | when `subenabled` | registering the connection-close (`.z.pc`) cleanup | + +`log`, `timer` and `handlers` follow the standard kdb-x core dependency contracts; +`pubsub` and `servers` are heartbeat-specific. `servers` and `handlers` are only +required when `subenabled` is set (i.e. this process monitors other heartbeats); +a pure publisher needs only `log`, `timer` and `pubsub`. + +Only the functions the module actually calls are accessed (`timer`'s `addjob`, +`handlers`' `register`), but supplying the full contracted dictionary keeps the +dependency interchangeable with the real `di.*` modules. + +The module keeps its **own** current-time function rather than taking it from the +timer dependency (so it doesn't rely on the timer exporting a clock getter). It +defaults to `.z.p`; override it with `setcp` for deterministic tests or simulation, +e.g. `heartbeat.setcp[{2025.01.01D00:00:00.000}]`. + +## Configuration + +`init[config;deps]` takes a configuration dictionary as its first argument. Any +recognised key may be supplied; unset keys keep their defaults. + +| Key | Default | Description | +|-----|---------|-------------| +| `enabled` | `1b` | publish and check heartbeats | +| `subenabled` | `0b` | act as a monitor: subscribe to other heartbeats and register the disconnect handler | +| `debug` | `1b` | log warning / error transitions | +| `publishinterval` | `0D00:00:30` | how often heartbeats are published | +| `checkinterval` | `0D00:00:10` | how often received heartbeats are checked | +| `warningtolerance` | `1.5` | warning after `warningtolerance*publishinterval` without a beat | +| `errortolerance` | `2f` | error after `errortolerance*publishinterval` without a beat | +| `proctype` | `` `unknown `` | this process's type (published as `sym`) | +| `procname` | `.z.h` | this process's name | +| `pid` `host` `port` | from `.z` | this process's identity | +| `connections` | `` `$() `` | process types this monitor subscribes to (used by `hbsubscriptions`) | +| `onwarning` | no-op | callback invoked with the rows entering warning state | +| `onerror` | no-op | callback invoked with the rows entering error state | + +## Public API + +| Function | Description | +|----------|-------------| +| `init[config;deps]` | wire dependencies and configuration, and schedule the timer jobs | +| `publishheartbeat[]` | publish a single heartbeat row and increment the counter | +| `checkheartbeat[]` | flag processes that have not heartbeated in time | +| `storeheartbeat[batch]` | store incoming heartbeat(s); call from `upd` on the monitor | +| `addprocs[proctypes;procnames]` | seed expected processes so a never-seen process is flagged | +| `subscribe[handles]` | subscribe to heartbeats on the given remote handle(s) | +| `hbsubscriptions[]` | subscribe to all configured publishers (by `connections` process type) | +| `gethb[]` | return the heartbeat store | +| `setcp[f]` | replace the current-time function (for tests / simulation) | + +## Heartbeat store schema + +`gethb[]` returns the store, keyed on `sym`,`procname`: + +| Column | Type | Description | +|--------|------|-------------| +| sym | `symbol` | process type | +| procname | `symbol` | process name | +| time | `timestamp` | time of the last received heartbeat | +| counter | `long` | counter from the last heartbeat | +| pid | `int` | publisher process id | +| host | `symbol` | publisher host | +| port | `int` | publisher port | +| warning | `boolean` | process is in warning state | +| error | `boolean` | process is in error state | + +## Example + +```q +// load the module +heartbeat: use `di.heartbeat + +// build dependency dictionaries (here from di.log and di.timer) +// note: bound as logmod, not log, since log is a reserved q word +logmod: use `di.log +logmod.init[()!()] +logdep: `info`warn`error!(logmod.info;logmod.warn;logmod.error) + +timer: use `di.timer +timer.init[()!()] +// heartbeat only needs addjob from the timer - it keeps its own clock (see setcp) +// note: di.timer's addjob is a namespace; addjob.custom has the [id;func;params;period;mode;opts] signature heartbeat calls +timerdep: enlist[`addjob]!enlist timer.addjob.custom + +// a pubsub dependency must provide publish[table;data] and subscribe[handle] +pubsub: use `di.pubsub +psdep: `publish`subscribe!(pubsub.publish; {[h] h(`.m.di.0pubsub.subscribe;`heartbeat;`)}) + +// initialise as a publishing RDB (log, timer and pubsub are all required) +heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)] + +// publish a heartbeat immediately (normally the timer does this) +heartbeat.publishheartbeat[] +``` + +On the monitoring side, route received heartbeats into the store from `upd` and let +the scheduled `checkheartbeat` raise warnings / errors: + +```q +upd: {[t;x] if[t~`heartbeat; heartbeat.storeheartbeat[x]]; } +heartbeat.gethb[] +``` diff --git a/di/heartbeat/heartbeat.q b/di/heartbeat/heartbeat.q new file mode 100644 index 00000000..095eb470 --- /dev/null +++ b/di/heartbeat/heartbeat.q @@ -0,0 +1,222 @@ +/ heartbeat module for kdb-x +/ every process can publish a periodic heartbeat over pub/sub so that downstream +/ monitors can detect when a process has stopped beating - i.e. it is stalled or +/ blocked - even when the underlying connection is still valid +/ the module handles both publishing heartbeats and, on the monitoring side, +/ storing received heartbeats and raising warnings / errors when they stop +/ runtime dependencies are injected via init and are required - the module errors +/ immediately if a required dependency is missing - see heartbeat.md +/ module-local state convention: read config bare, mutate via .z.m, and access +/ injected dependencies via .z.m at every call site + +/ table used to publish heartbeats - sym holds the publishing process type +heartbeat:([] time:`timestamp$(); sym:`symbol$(); procname:`symbol$(); counter:`long$(); pid:`int$(); host:`symbol$(); port:`int$()); + +/ keyed store of the latest received heartbeat per process, with warning / error state +hb:update warning:0b,error:0b from `sym`procname xkey heartbeat; + +/ remote handles we have already subscribed to for heartbeats +subscribedhandles:`int$(); + +/ heartbeat counter +hbcounter:0; + +/ current-time function - heartbeat owns its clock; override via setcp for testing / simulation +cp:{.z.p}; + +setcp:{[f] + / replace the current-time function (used by tests and simulation) + .z.m.cp:f; + }; + +/ configuration defaults - overridden by the config dictionary passed to init +enabled:1b; / whether heartbeat publishing / checking is enabled +subenabled:0b; / whether this process monitors (subscribes to) other heartbeats +debug:1b; / whether to log warning / error transitions +publishinterval:0D00:00:30; / how often heartbeats are published +checkinterval:0D00:00:10; / how often received heartbeats are checked +warningtolerance:1.5; / warning after warningtolerance*publishinterval without a beat +errortolerance:2f; / error after errortolerance*publishinterval without a beat +proctype:`unknown; / this process's type (published as sym) +procname:.z.h; / this process's name +pid:.z.i; / this process's pid +host:.z.h; / this process's host +port:`int$system"p"; / this process's port +connections:`$(); / process types this monitor should subscribe to +onwarning:{[procs]}; / callback fired with the rows entering warning state +onerror:{[procs]}; / callback fired with the rows entering error state + +/ recognised configuration keys - anything else passed via config is ignored +configkeys:`enabled`subenabled`debug`publishinterval`checkinterval`warningtolerance`errortolerance`proctype`procname`pid`host`port`connections`onwarning`onerror; + +/ warning / error grace periods - vary by process type if required +warningperiod:{[processtype] `timespan$warningtolerance*publishinterval}; +errorperiod:{[processtype] `timespan$errortolerance*publishinterval}; + +requiredep:{[deps;name] + / extract a required dependency dictionary, erroring immediately if absent or null + d:$[99h=type deps;$[(name in key deps) and not (::)~deps name;deps name;()!()];()!()]; + if[not count d; + '"di.heartbeat: ",(string name)," dependency is required; pass it via init deps - see di.",string name]; + d + }; + +setdeps:{[deps] + / extract and store the required dependencies, flattened for access via .z.m + / log, timer and pubsub are always required; servers and handlers only when monitoring + logdict:requiredep[deps;`log]; + .z.m.loginfo:logdict`info; + .z.m.logwarn:logdict`warn; + .z.m.logerr:logdict`error; + timerdict:requiredep[deps;`timer]; + .z.m.timeraddjob:timerdict`addjob; + pubsubdict:requiredep[deps;`pubsub]; + .z.m.pubsubpublish:pubsubdict`publish; + .z.m.pubsubsubscribe:pubsubdict`subscribe; + if[subenabled; + serversdict:requiredep[deps;`servers]; + .z.m.serversgetservers:serversdict`getservers; + handlersdict:requiredep[deps;`handlers]; + .z.m.handlersregister:handlersdict`register]; + }; + +setconfig:{[config] + / apply recognised configuration overrides (a dictionary) on top of current values + cfg:$[99h=type config;config;()!()]; + ks:configkeys inter key cfg; + (.Q.dd[.z.M] each ks) set' cfg ks; + }; + +tosecs:{[span] + / convert a timespan into whole seconds for the timer period + `int$span%0D00:00:01 + }; + +registertimers:{ + / schedule the periodic heartbeat jobs via the injected timer + / mode 2 = period after previous actual start - a heartbeat says "alive now", so + / missed beats must not be replayed as a catch-up storm (which mode 1 would do) + if[enabled; + .z.m.timeraddjob[`hbpublish;publishheartbeat;();tosecs publishinterval;2;()!()]; + .z.m.timeraddjob[`hbcheck;checkheartbeat;();tosecs checkinterval;2;()!()]]; + if[subenabled; + .z.m.timeraddjob[`hbsubscribe;hbsubscriptions;();60;2;()!()]]; + }; + +registerhandlers:{ + / wire the connection-close cleanup through the injected handler manager + if[subenabled; + .z.m.handlersregister[`.z.pc;`heartbeat;closeconnection]]; + }; + +init:{[config;deps] + / initialise the module with configuration and injected dependencies - see heartbeat.md + / config - dictionary of configuration overrides (see configkeys), or (::) for defaults + / deps - dictionary of injected dependencies keyed by name, each a dictionary of functions: + / `log - `info`warn`error - required + / `timer - `addjob (full di.timer dict may be passed) - required + / `pubsub - `publish`subscribe - required + / `servers - `getservers - required when subenabled + / `handlers - `register`remove`list - required when subenabled + / note: the module keeps its own clock (cp, default .z.p) - override via setcp + / example: + / heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)] + setconfig config; + setdeps deps; + registertimers[]; + registerhandlers[]; + .z.m.loginfo[`heartbeat;"di.heartbeat initialised"]; + }; + +publishheartbeat:{ + / publish a single heartbeat row over pub/sub and bump the counter + if[not enabled;:()]; + .z.m.pubsubpublish[`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(cp[];proctype;procname;hbcounter;pid;host;port)]; + .z.m.hbcounter:hbcounter+1; + }; + +storeheartbeat:{[batch] + / store one or more incoming heartbeats, keeping the latest per process and + / clearing warning / error state - call this from upd when a heartbeat arrives + .z.m.hb:hb upsert update warning:0b,error:0b from select by sym,procname from batch; + }; + +addprocs:{[proctypes;procnames] + / seed the store with expected processes so a never-seen process is flagged + / real heartbeats arriving later override these seeded rows + seed:2!([]sym:proctypes,();procname:procnames,();time:cp[];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b); + .z.m.hb:seed,hb; + }; + +logwarnproc:{[r] + / log a single process moving into warning state + .z.m.logwarn[`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; + }; + +logerrproc:{[r] + / log a single process moving into error state + .z.m.logerr[`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; + }; + +warn:{[procs] + / move processes into warning state, log and fire the warning callback + if[debug;logwarnproc each 0!procs]; + .z.m.hb:hb upsert select sym,procname,warning:1b from procs; + onwarning procs; + }; + +err:{[procs] + / move processes into error state, log and fire the error callback + if[debug;logerrproc each 0!procs]; + .z.m.hb:hb upsert select sym,procname,error:1b from procs; + onerror procs; + }; + +checkheartbeat:{ + / flag processes that have not heartbeated within the warning / error grace periods + / status: 0 healthy, 1 warning, 2+ error + / grace periods are computed as locals first - module functions do not resolve inside qsql + now:cp[]; + t:0!hb; + wp:warningperiod each t`sym; + ep:errorperiod each t`sym; + stats:update status:(`short$now>time+wp)+`short$2*now>time+ep from t; + newwarn:select sym,procname,time from stats where status=1,not warning; + newerr:select sym,procname,time from stats where status>1,not error; + if[count newwarn;warn newwarn]; + if[count newerr;err newerr]; + }; + +subscribeone:{[h] + / subscribe to a single remote heartbeat publisher, logging and skipping on failure + ok:@[{.z.m.pubsubsubscribe x;1b};h;{[h;e] .z.m.logerr[`heartbeat;"failed to subscribe to heartbeats on handle ",(string h),": ",e];0b}[h]]; + if[ok;.z.m.subscribedhandles:distinct subscribedhandles,h]; + }; + +subscribe:{[handles] + / subscribe to heartbeats on the given remote handle(s), tracking successful subscriptions + subscribeone each (),handles; + }; + +getheartbeats:{[proctype] + / subscribe to publishers of the given process type(s) that are not yet subscribed + handles:(.z.m.serversgetservers proctype) except subscribedhandles; + if[count handles; + .z.m.loginfo[`heartbeat;"subscribing to new heartbeat handle(s) ",", " sv string handles]; + subscribe handles]; + }; + +hbsubscriptions:{ + / subscribe to all configured heartbeat publishers (by configured process type) + getheartbeats connections; + }; + +closeconnection:{[h] + / drop a closed handle from the tracked subscriptions - registered against .z.pc + .z.m.subscribedhandles:subscribedhandles except h; + }; + +gethb:{ + / return the current heartbeat store for inspection + hb + }; diff --git a/di/heartbeat/init.q b/di/heartbeat/init.q new file mode 100644 index 00000000..22294939 --- /dev/null +++ b/di/heartbeat/init.q @@ -0,0 +1,8 @@ +/ load core functionality into the module +\l ::heartbeat.q + +/ module version - compared against dependants' minimum requirements by di.depcheck +version:"0.1.0"; + +/ public api - only the functions intended to be called externally are exported +export:([init;publishheartbeat;checkheartbeat;storeheartbeat;addprocs;subscribe;hbsubscriptions;gethb;setcp;version]) diff --git a/di/heartbeat/test.csv b/di/heartbeat/test.csv new file mode 100644 index 00000000..351fb78e --- /dev/null +++ b/di/heartbeat/test.csv @@ -0,0 +1,63 @@ +action,ms,bytes,lang,code,repeat,minver,comment +comment,,,,,,,Setup - load module and inject mock dependencies +before,0,0,q,heartbeat:use`di.heartbeat,1,1,load the heartbeat module +before,0,0,q,logdep:`info`warn`error!({[c;m]};{[c;m]};{[c;m]}),1,1,silent log mock +before,0,0,q,.test.now:2025.01.01D00:00:00.000,1,1,controllable current time for the timer mock +before,0,0,q,timerdep:enlist[`addjob]!enlist {[id;func;params;period;mode;opts]},1,1,timer mock - heartbeat only requires addjob +before,0,0,q,psdep:`publish`subscribe!({[t;x] .test.pubt:t;.test.pubx:x;.test.pubcount:.test.pubcount+1};{[h] .test.subh:h}),1,1,pubsub mock capturing publish and subscribe +before,0,0,q,.test.pubcount:0,1,1,initialise publish counter +before,0,0,q,deps:`log`timer`pubsub!(logdep;timerdep;psdep),1,1,assemble dependency dictionary +before,0,0,q,heartbeat.init[`proctype`procname!(`rdb;`rdb1);deps],1,1,initialise with identity config and mocks +before,0,0,q,heartbeat.setcp[{[].test.now}],1,1,point heartbeat clock at the controllable test time + +comment,,,,,,,publishheartbeat publishes a row and bumps the counter +run,0,0,q,.test.pubcount:0,1,1,reset publish counter +run,0,0,q,heartbeat.publishheartbeat[],1,1,publish a heartbeat +true,0,0,q,1=.test.pubcount,1,1,publish dependency was called once +true,0,0,q,`heartbeat~.test.pubt,1,1,published to the heartbeat table +true,0,0,q,`rdb~(first .test.pubx)`sym,1,1,published row carries the process type +true,0,0,q,1=.m.di.0heartbeat.hbcounter,1,1,counter incremented + +comment,,,,,,,disabled publishing is a no-op +run,0,0,q,.m.di.0heartbeat.enabled:0b,1,1,disable heartbeating +run,0,0,q,.test.pubcount:0,1,1,reset publish counter +run,0,0,q,heartbeat.publishheartbeat[],1,1,attempt to publish while disabled +true,0,0,q,0=.test.pubcount,1,1,nothing published when disabled +run,0,0,q,.m.di.0heartbeat.enabled:1b,1,1,re-enable heartbeating + +comment,,,,,,,storeheartbeat records incoming beats with cleared state +run,0,0,q,.m.di.0heartbeat.hb:0#.m.di.0heartbeat.hb,1,1,clear the store +run,0,0,q,.test.batch:([]time:enlist .test.now;sym:enlist`rdb;procname:enlist`rdb1;counter:enlist 1;pid:enlist 1i;host:enlist`h;port:enlist 5000i),1,1,build an incoming heartbeat +run,0,0,q,heartbeat.storeheartbeat[.test.batch],1,1,store the heartbeat +true,0,0,q,1=count heartbeat.gethb[],1,1,one process recorded +true,0,0,q,not first exec warning from heartbeat.gethb[],1,1,stored beat has warning cleared + +comment,,,,,,,addprocs seeds an expected process +run,0,0,q,heartbeat.addprocs[`rdb;`rdb2],1,1,seed an expected process +true,0,0,q,`rdb2 in exec procname from heartbeat.gethb[],1,1,seeded process present + +comment,,,,,,,checkheartbeat flags warning then error as time passes +run,0,0,q,.m.di.0heartbeat.hb:0#.m.di.0heartbeat.hb,1,1,clear the store +run,0,0,q,.test.now:2025.01.01D00:00:00.000,1,1,reset time to t0 +run,0,0,q,heartbeat.addprocs[`rdb;`stale],1,1,seed a process that will go stale +run,0,0,q,.test.now:2025.01.01D00:00:50.000,1,1,advance 50s past the 45s warning period +run,0,0,q,heartbeat.checkheartbeat[],1,1,run the check +true,0,0,q,first exec warning from heartbeat.gethb[] where procname=`stale,1,1,process moved to warning +true,0,0,q,not first exec error from heartbeat.gethb[] where procname=`stale,1,1,not yet in error +run,0,0,q,.test.now:2025.01.01D00:02:00.000,1,1,advance past the 60s error period +run,0,0,q,heartbeat.checkheartbeat[],1,1,run the check again +true,0,0,q,first exec error from heartbeat.gethb[] where procname=`stale,1,1,process moved to error + +comment,,,,,,,subscribe tracks handles and closeconnection removes them +run,0,0,q,.m.di.0heartbeat.subscribedhandles:`int$(),1,1,clear tracked handles +run,0,0,q,heartbeat.subscribe[5i],1,1,subscribe to a remote handle +true,0,0,q,5i in .m.di.0heartbeat.subscribedhandles,1,1,handle tracked after subscribe +true,0,0,q,5i~.test.subh,1,1,subscribe dependency was invoked with the handle +run,0,0,q,.m.di.0heartbeat.closeconnection[5i],1,1,simulate connection close +true,0,0,q,not 5i in .m.di.0heartbeat.subscribedhandles,1,1,handle removed on close + +comment,,,,,,,init errors immediately when a required dependency is missing +fail,0,0,q,heartbeat.init[()!();()!()],1,1,errors when log dependency missing +fail,0,0,q,heartbeat.init[()!();enlist[`log]!enlist logdep],1,1,errors when timer dependency missing +fail,0,0,q,heartbeat.init[()!();`log`timer!(logdep;timerdep)],1,1,errors when pubsub dependency missing +fail,0,0,q,heartbeat.init[enlist[`subenabled]!enlist 1b;`log`timer`pubsub!(logdep;timerdep;psdep)],1,1,monitor errors when servers dependency missing