From 66f2ef9659615bc8061cac9faa2cdce17ebc0ecf Mon Sep 17 00:00:00 2001 From: Olly99999 Date: Tue, 2 Jun 2026 15:13:51 +0100 Subject: [PATCH 1/6] renamed to di.memstats --- di/memstats/init.q | 3 ++ di/memstats/memstats.md | 57 ++++++++++++++++++++++++++ di/memstats/memstats.q | 89 +++++++++++++++++++++++++++++++++++++++++ di/memstats/test.csv | 50 +++++++++++++++++++++++ 4 files changed, 199 insertions(+) create mode 100644 di/memstats/init.q create mode 100644 di/memstats/memstats.md create mode 100644 di/memstats/memstats.q create mode 100644 di/memstats/test.csv diff --git a/di/memstats/init.q b/di/memstats/init.q new file mode 100644 index 00000000..0c8d26cf --- /dev/null +++ b/di/memstats/init.q @@ -0,0 +1,3 @@ +\l ::memstats.q + +export:([objsize;memusageall;memusagevars]) diff --git a/di/memstats/memstats.md b/di/memstats/memstats.md new file mode 100644 index 00000000..751e97f9 --- /dev/null +++ b/di/memstats/memstats.md @@ -0,0 +1,57 @@ +# Memory Usage +This module can be used to calculate the approximate size of an object in memory, or for generating a table containing the approximate size of each object in memory. + +## Main funtions +The module contains two methods for calculating memory usage. + +The `memusage` functions generate a table containing the approximate memoryusage of each object in the kdb session in bytes / megabytes using -22!. This can be useful quick approximations. + +`memusagevars[]`:Generates a table of the approximate memory usage statistics of all variables in a kdb session. + +`memusageall[]`:Generates a table of the approximate memory usage statistics of all variables and views in a kdb session. + +---- + +The `objsize` function is more computationally expensive, it tries to calculate the actual memory size of an object by including nested types and attributes. + +`objsize[]`:Returns the approximate size of an individual kdb object including nested types and attributes. + +## memstats table schema +The memusage table is returned from either the `memusagevars` or `memusageall` functions. + +| Column | Type | Description | +|----------|-------------|---------------------------------------------| +| variable | `symbol` | Namespace and name of variable | +| size | `long` | The approximate size of the object in bytes | +| sizeMB | `int` | The approximatee size of the object in MB | + +## Example +Below is an example of loading the module into a session and viewing the size of different objects. + +```q +\\ Loading the module into a session +memstats: use `di.memstats + +\\ View dictionary of functions +memstats + +\\ Calculating the memory usage of an object + +a:1 / - an atom should return 16 + +b: ([]a:`a`b`c; b:1 2 3) + +memstats.objsize[a] + +memstats.objsize[b] + +// View a and b in the memstats table + +select from memstats.memusagevars[] where variable in `..a`..b + +variable size sizeMB +-------------------- +..b 69 0 +..a 17 0 + +``` \ No newline at end of file diff --git a/di/memstats/memstats.q b/di/memstats/memstats.q new file mode 100644 index 00000000..6bfa1678 --- /dev/null +++ b/di/memstats/memstats.q @@ -0,0 +1,89 @@ +/ library for viewing the approximate memory size of individual kdb objects +/ and viewing the approximate memory usage statistics of a kdb session + +/ functionality to return approximate memory size of kdb+ objects + +attrsize:{ + / `u#2 4 5 unique 32*u + $[`u=a:attr x;32*count distinct x; + / `p#2 2 1 parted (8*u;32*u;8*u+1) + `p=a;8+48*count distinct x; + 0] + }; + +/ (16 bytes + attribute overheads + raw size) to the nearest power of 2 +calcsize:{[c;s;a] `long$2 xexp ceiling 2 xlog 16+a+s*c}; + +vectorsize:{calcsize[count x;typesize x;attrsize x]}; + +/ raw size of atoms according to type, type 20h->76h have 4 bytes pointer size +typesize:{4^0N 1 16 0N 1 2 4 8 4 8 1 8 8 4 4 8 8 4 4 4 abs type x}; + +sampling:{[f;x] + / pick samples randomly accoding to threshold and apply function + threshold:100000; + $[thresholdt:type x;$[-2h=t;32;16]; + / list & enum list + t within 1 76h;vectorsize x; + / exit early for anything above 76h + 76h1000 has no attrbutes (i.e. table unlikely to have 1000 columns, list of strings unlikely to have attr for some objects only + (d[0] within 1 76h)&1=count d:distinct t;calcsize[count x;ptrsize;0]+"j"$scalesampling[{sum calcsize[count each x;typesize x 0;$[10000)&size>0),1,1,does large table show size and sizeMB + +run,0,0,q,tview::select from .test.smalltable,1,1,test memusage and views +true,0,0,q,1~count select from memstats.memusageall[] where variable in `..tview,1,1,is view shown in memusageall table +true,0,0,q,0~count select from memstats.memusagevars[] where variable in `..tview,1,1,no views shown in memusagevars + From ac34b031f58b8ababacc041d289b1d26b5ca32f1 Mon Sep 17 00:00:00 2001 From: Olly99999 Date: Wed, 3 Jun 2026 17:16:05 +0100 Subject: [PATCH 2/6] Fixed typos --- di/memstats/memstats.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/di/memstats/memstats.md b/di/memstats/memstats.md index 751e97f9..f19efd61 100644 --- a/di/memstats/memstats.md +++ b/di/memstats/memstats.md @@ -1,7 +1,7 @@ -# Memory Usage +# Memstats This module can be used to calculate the approximate size of an object in memory, or for generating a table containing the approximate size of each object in memory. -## Main funtions +## Main functions The module contains two methods for calculating memory usage. The `memusage` functions generate a table containing the approximate memoryusage of each object in the kdb session in bytes / megabytes using -22!. This can be useful quick approximations. @@ -16,26 +16,26 @@ The `objsize` function is more computationally expensive, it tries to calculate `objsize[]`:Returns the approximate size of an individual kdb object including nested types and attributes. -## memstats table schema +## Memstats table schema The memusage table is returned from either the `memusagevars` or `memusageall` functions. | Column | Type | Description | |----------|-------------|---------------------------------------------| | variable | `symbol` | Namespace and name of variable | | size | `long` | The approximate size of the object in bytes | -| sizeMB | `int` | The approximatee size of the object in MB | +| sizeMB | `int` | The approximate size of the object in MB | ## Example Below is an example of loading the module into a session and viewing the size of different objects. ```q -\\ Loading the module into a session +// Loading the module into a session memstats: use `di.memstats -\\ View dictionary of functions +// View dictionary of functions memstats -\\ Calculating the memory usage of an object +// Calculating the memory usage of an object a:1 / - an atom should return 16 From 886fa87829c3101533294e9490726b76708b97dc Mon Sep 17 00:00:00 2001 From: Olly99999 Date: Fri, 5 Jun 2026 16:52:35 +0100 Subject: [PATCH 3/6] In progress rewrite of di.heartbeat --- di/heartbeat/deps.q | 4 + di/heartbeat/heartbeat.md | 111 ++++++++++++++++++++ di/heartbeat/heartbeat.q | 210 ++++++++++++++++++++++++++++++++++++++ di/heartbeat/init.q | 8 ++ di/heartbeat/test.csv | 56 ++++++++++ 5 files changed, 389 insertions(+) create mode 100644 di/heartbeat/deps.q create mode 100644 di/heartbeat/heartbeat.md create mode 100644 di/heartbeat/heartbeat.q create mode 100644 di/heartbeat/init.q create mode 100644 di/heartbeat/test.csv diff --git a/di/heartbeat/deps.q b/di/heartbeat/deps.q new file mode 100644 index 00000000..42185407 --- /dev/null +++ b/di/heartbeat/deps.q @@ -0,0 +1,4 @@ +/ hard module dependencies and their minimum versions, validated by di.depcheck +/ di.heartbeat has no hard dependencies - all runtime dependencies (log, timer, +/ handlers, pubsub, servers) are injected via init as dictionaries of functions +deps:(`$())!(); diff --git a/di/heartbeat/heartbeat.md b/di/heartbeat/heartbeat.md new file mode 100644 index 00000000..251cd895 --- /dev/null +++ b/di/heartbeat/heartbeat.md @@ -0,0 +1,111 @@ +# Heartbeat + +This module lets every process publish a periodic heartbeat over pub/sub, and lets +monitoring processes detect when a process has stopped beating - i.e. it is stalled +or blocked - even when the underlying connection is still valid. + +It covers both sides: + +* **Publishing** - a process periodically publishes a heartbeat row over pub/sub. +* **Monitoring** - a process subscribes to other processes' heartbeats, stores the + latest beat per process, and raises a *warning* then an *error* when a process + stops heartbeating within the configured grace periods. + +## Dependencies + +All runtime dependencies are **injected** via `init` as dictionaries of functions, +so the module has no hard dependencies on any other module and runs standalone with +minimal built-in fallbacks (logging to stdout, no-op timer/handlers/pubsub). Inject +real implementations to get full functionality. + +| Dependency | Keys | Default fallback | Purpose | +|------------|------|------------------|---------| +| `log` | `info` `warn` `error` (each `{[ctx;msg]}`) | writes to stdout | logging | +| `timer` | `addjob` `deletejobs` `enablejobs` `disablejobs` `getactivejobs` `cp` | no-op (`cp` returns `.z.p`) | scheduling publish/check/subscribe and the current-time source | +| `handlers` | `register` `remove` `list` | no-op | registering the connection-close (`.z.pc`) cleanup | +| `pubsub` | `publish` (`{[table;data]}`) `subscribe` (`{[handle]}`) | no-op | publishing heartbeats / subscribing to publishers | +| `servers` | `getservers` (`{[proctype]}` returning handles) | returns empty | discovering heartbeat publishers by process type | + +`log`, `timer` and `handlers` follow the standard kdb-x core dependency contracts; +`pubsub` and `servers` are heartbeat-specific. + +## Configuration + +`init[config;deps]` takes a configuration dictionary as its first argument. Any +recognised key may be supplied; unset keys keep their defaults. + +| Key | Default | Description | +|-----|---------|-------------| +| `enabled` | `1b` | publish and check heartbeats | +| `subenabled` | `0b` | act as a monitor: subscribe to other heartbeats and register the disconnect handler | +| `debug` | `1b` | log warning / error transitions | +| `publishinterval` | `0D00:00:30` | how often heartbeats are published | +| `checkinterval` | `0D00:00:10` | how often received heartbeats are checked | +| `warningtolerance` | `1.5` | warning after `warningtolerance*publishinterval` without a beat | +| `errortolerance` | `2f` | error after `errortolerance*publishinterval` without a beat | +| `proctype` | `` `unknown `` | this process's type (published as `sym`) | +| `procname` | `.z.h` | this process's name | +| `pid` `host` `port` | from `.z` | this process's identity | +| `connections` | `` `$() `` | process types this monitor subscribes to (used by `hbsubscriptions`) | +| `onwarning` | no-op | callback invoked with the rows entering warning state | +| `onerror` | no-op | callback invoked with the rows entering error state | + +## Public API + +| Function | Description | +|----------|-------------| +| `init[config;deps]` | wire dependencies and configuration, and schedule the timer jobs | +| `publishheartbeat[]` | publish a single heartbeat row and increment the counter | +| `checkheartbeat[]` | flag processes that have not heartbeated in time | +| `storeheartbeat[batch]` | store incoming heartbeat(s); call from `upd` on the monitor | +| `addprocs[proctypes;procnames]` | seed expected processes so a never-seen process is flagged | +| `subscribe[handles]` | subscribe to heartbeats on the given remote handle(s) | +| `hbsubscriptions[]` | subscribe to all configured publishers (by `connections` process type) | +| `gethb[]` | return the heartbeat store | + +## Heartbeat store schema + +`gethb[]` returns the store, keyed on `sym`,`procname`: + +| Column | Type | Description | +|--------|------|-------------| +| sym | `symbol` | process type | +| procname | `symbol` | process name | +| time | `timestamp` | time of the last received heartbeat | +| counter | `long` | counter from the last heartbeat | +| pid | `int` | publisher process id | +| host | `symbol` | publisher host | +| port | `int` | publisher port | +| warning | `boolean` | process is in warning state | +| error | `boolean` | process is in error state | + +## Example + +```q +// load the module +heartbeat: use `di.heartbeat + +// build dependency dictionaries (here using di.log and di.timer) +log: use `di.log +log.init[()!()] +logdep: `info`warn`error!(log.info;log.warn;log.error) + +timer: use `di.timer +timer.init[()!()] +timerdep: `addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!( + timer.addjob;timer.deletejobs;timer.enablejobs;timer.disablejobs;timer.getactivejobs;timer.cp) + +// initialise as a publishing RDB +heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer!(logdep;timerdep)] + +// publish a heartbeat immediately (normally the timer does this) +heartbeat.publishheartbeat[] +``` + +On the monitoring side, route received heartbeats into the store from `upd` and let +the scheduled `checkheartbeat` raise warnings / errors: + +```q +upd: {[t;x] if[t~`heartbeat; heartbeat.storeheartbeat[x]]; } +heartbeat.gethb[] +``` diff --git a/di/heartbeat/heartbeat.q b/di/heartbeat/heartbeat.q new file mode 100644 index 00000000..6a09e051 --- /dev/null +++ b/di/heartbeat/heartbeat.q @@ -0,0 +1,210 @@ +/ heartbeat module for kdb-x +/ every process can publish a periodic heartbeat over pub/sub so that downstream +/ monitors can detect when a process has stopped beating - i.e. it is stalled or +/ blocked - even when the underlying connection is still valid +/ module-local state convention: read via .z.M, mutate via .z.m + +/ table used to publish heartbeats - sym holds the publishing process type +heartbeat:([] time:`timestamp$(); sym:`symbol$(); procname:`symbol$(); counter:`long$(); pid:`int$(); host:`symbol$(); port:`int$()); + +/ keyed store of the latest received heartbeat per process, with warning / error state +hb:update warning:0b,error:0b from `sym`procname xkey heartbeat; + +/ remote handles we have already subscribed to for heartbeats +subscribedhandles:`int$(); + +/ heartbeat counter +hbcounter:0; + +/ configuration defaults - overridden by the config dictionary passed to init +enabled:1b; / whether heartbeat publishing / checking is enabled +subenabled:0b; / whether this process monitors (subscribes to) other heartbeats +debug:1b; / whether to log warning / error transitions +publishinterval:0D00:00:30; / how often heartbeats are published +checkinterval:0D00:00:10; / how often received heartbeats are checked +warningtolerance:1.5; / warning after warningtolerance*publishinterval without a beat +errortolerance:2f; / error after errortolerance*publishinterval without a beat +proctype:`unknown; / this process's type (published as sym) +procname:.z.h; / this process's name +pid:.z.i; / this process's pid +host:.z.h; / this process's host +port:`int$system"p"; / this process's port +connections:`$(); / process types this monitor should subscribe to +onwarning:{[procs]}; / callback fired with the rows entering warning state +onerror:{[procs]}; / callback fired with the rows entering error state + +/ recognised configuration keys - anything else passed via config is ignored +configkeys:`enabled`subenabled`debug`publishinterval`checkinterval`warningtolerance`errortolerance`proctype`procname`pid`host`port`connections`onwarning`onerror; + +/ minimal default dependencies so the module works standalone (with less functionality) +/ injected real implementations replace these in init, matched key-for-key +defaultlog:`info`warn`error!( + {[ctx;msg] -1 "INFO ",(string ctx),": ",msg;}; + {[ctx;msg] -1 "WARN ",(string ctx),": ",msg;}; + {[ctx;msg] -2 "ERROR ",(string ctx),": ",msg;}); + +defaulttimer:`addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!( + {[id;func;params;period;mode;opts]}; + {[ids]}; + {[ids]}; + {[ids]}; + {[] 0#`id`func`params`period!(`$();();();`int$())}; + {[] .z.p}); + +defaulthandlers:`register`remove`list!( + {[event;name;func]}; + {[event;name]}; + {[event] ()}); + +defaultpubsub:`publish`subscribe!( + {[tbl;data]}; + {[handle]}); + +defaultservers:enlist[`getservers]!enlist {[proctype] `int$()}; + +extractdep:{[deps;name] + / pull a named dependency dictionary out of deps, empty dict when absent or null + $[(name in key deps) and not (::)~deps name;deps name;()!()] + }; + +setdeps:{[deps] + / store injected dependencies, merging over the minimal defaults per key + if[not 99h=type deps;deps:()!()]; + .z.m.log:defaultlog,extractdep[deps;`log]; + .z.m.timer:defaulttimer,extractdep[deps;`timer]; + .z.m.handlers:defaulthandlers,extractdep[deps;`handlers]; + .z.m.pubsub:defaultpubsub,extractdep[deps;`pubsub]; + .z.m.servers:defaultservers,extractdep[deps;`servers]; + }; + +setconfig:{[config] + / apply recognised configuration overrides (a dictionary) on top of current values + if[not 99h=type config;:()]; + ks:configkeys inter key config; + {[k;v] (` sv `.z.m,k) set v}'[ks;config ks]; + }; + +tosecs:{[span] + / convert a timespan into whole seconds for the timer period + `int$span%0D00:00:01 + }; + +registertimers:{ + / schedule the periodic heartbeat jobs via the injected timer (mode 1 = fixed interval) + if[enabled; + .z.M.timer[`addjob][`hbpublish;publishheartbeat;();tosecs publishinterval;1;()!()]; + .z.M.timer[`addjob][`hbcheck;checkheartbeat;();tosecs checkinterval;1;()!()]]; + if[subenabled; + .z.M.timer[`addjob][`hbsubscribe;hbsubscriptions;();60;1;()!()]]; + }; + +registerhandlers:{ + / wire the connection-close cleanup through the injected handler manager + if[subenabled; + .z.M.handlers[`register][`.z.pc;`heartbeat;closeconnection]]; + }; + +init:{[config;deps] + / initialise the module with configuration and injected dependencies - see heartbeat.md + / config - dictionary of configuration overrides (see configkeys), or (::) for defaults + / deps - dictionary of injected dependencies keyed by name, each a dictionary of functions + setdeps deps; + setconfig config; + registertimers[]; + registerhandlers[]; + .z.M.log[`info][`heartbeat;"di.heartbeat initialised"]; + }; + +publishheartbeat:{ + / publish a single heartbeat row over pub/sub and bump the counter + if[not enabled;:()]; + .z.M.pubsub[`publish][`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(.z.M.timer[`cp][];proctype;procname;.z.M.hbcounter;pid;host;port)]; + .z.m.hbcounter:.z.M.hbcounter+1; + }; + +storeheartbeat:{[batch] + / store one or more incoming heartbeats, keeping the latest per process and + / clearing warning / error state - call this from upd when a heartbeat arrives + .z.m.hb:.z.M.hb upsert update warning:0b,error:0b from select by sym,procname from batch; + }; + +addprocs:{[proctypes;procnames] + / seed the store with expected processes so a never-seen process is flagged + / real heartbeats arriving later override these seeded rows + seed:2!([]sym:proctypes,();procname:procnames,();time:.z.M.timer[`cp][];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b); + .z.m.hb:seed,.z.M.hb; + }; + +logwarnproc:{[r] + / log a single process moving into warning state + .z.M.log[`warn][`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; + }; + +logerrproc:{[r] + / log a single process moving into error state + .z.M.log[`error][`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; + }; + +warn:{[procs] + / move processes into warning state, log and fire the warning callback + if[debug;logwarnproc each 0!procs]; + .z.m.hb:.z.M.hb upsert select sym,procname,warning:1b from procs; + .z.M.onwarning procs; + }; + +err:{[procs] + / move processes into error state, log and fire the error callback + if[debug;logerrproc each 0!procs]; + .z.m.hb:.z.M.hb upsert select sym,procname,error:1b from procs; + .z.M.onerror procs; + }; + +warningperiod:{[processtype] `timespan$warningtolerance*publishinterval}; +errorperiod:{[processtype] `timespan$errortolerance*publishinterval}; + +checkheartbeat:{ + / flag processes that have not heartbeated within the warning / error grace periods + / status: 0 healthy, 1 warning, 2+ error + now:.z.M.timer[`cp][]; + stats:0!update + status:(`short$now>time+warningperiod each sym)+`short$2*now>time+errorperiod each sym + from .z.M.hb; + newwarn:select sym,procname,time from stats where status=1,not warning; + newerr:select sym,procname,time from stats where status>1,not error; + if[count newwarn;warn newwarn]; + if[count newerr;err newerr]; + }; + +subscribeone:{[h] + / subscribe to a single remote heartbeat publisher, logging and skipping on failure + ok:.[{.z.M.pubsub[`subscribe] x;1b};h;{[h;e] .z.M.log[`error][`heartbeat;"failed to subscribe to heartbeats on handle ",(string h),": ",e];0b}[h]]; + if[ok;.z.m.subscribedhandles:distinct .z.M.subscribedhandles,h]; + }; + +subscribe:{[handles] + / subscribe to heartbeats on the given remote handle(s), tracking successful subscriptions + subscribeone each (),handles; + }; + +getheartbeats:{[proctype] + / subscribe to publishers of the given process type(s) that are not yet subscribed + handles:(.z.M.servers[`getservers] proctype) except .z.M.subscribedhandles; + if[count handles; + .z.M.log[`info][`heartbeat;"subscribing to new heartbeat handle(s) ",", " sv string handles]; + subscribe handles]; + }; + +hbsubscriptions:{ + / subscribe to all configured heartbeat publishers (by configured process type) + getheartbeats connections; + }; + +closeconnection:{[h] + / drop a closed handle from the tracked subscriptions - registered against .z.pc + .z.m.subscribedhandles:.z.M.subscribedhandles except h; + }; + +gethb:{ + / return the current heartbeat store for inspection + .z.M.hb + }; \ No newline at end of file diff --git a/di/heartbeat/init.q b/di/heartbeat/init.q new file mode 100644 index 00000000..77ff3d0e --- /dev/null +++ b/di/heartbeat/init.q @@ -0,0 +1,8 @@ +/ load core functionality into the module +\l ::heartbeat.q + +/ module version - compared against dependants' minimum requirements by di.depcheck +version:"0.1.0"; + +/ public api - only the functions intended to be called externally are exported +export:([init;publishheartbeat;checkheartbeat;storeheartbeat;addprocs;subscribe;hbsubscriptions;gethb;version]) diff --git a/di/heartbeat/test.csv b/di/heartbeat/test.csv new file mode 100644 index 00000000..0478e8a9 --- /dev/null +++ b/di/heartbeat/test.csv @@ -0,0 +1,56 @@ +action,ms,bytes,lang,code,repeat,minver,comment +comment,,,,,,,Setup - load module and inject mock dependencies +before,0,0,q,heartbeat:use`di.heartbeat,1,1,load the heartbeat module +before,0,0,q,logdep:`info`warn`error!({[c;m]};{[c;m]};{[c;m]}),1,1,silent log mock +before,0,0,q,.test.now:2025.01.01D00:00:00.000,1,1,controllable current time for the timer mock +before,0,0,q,timerdep:`addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!({[id;func;params;period;mode;opts]};{[ids]};{[ids]};{[ids]};{[]([]id:`$())};{[].test.now}),1,1,timer mock with controllable cp +before,0,0,q,psdep:`publish`subscribe!({[t;x] .test.pubt:t;.test.pubx:x;.test.pubcount:.test.pubcount+1};{[h] .test.subh:h}),1,1,pubsub mock capturing publish and subscribe +before,0,0,q,.test.pubcount:0,1,1,initialise publish counter +before,0,0,q,deps:`log`timer`pubsub!(logdep;timerdep;psdep),1,1,assemble dependency dictionary +before,0,0,q,heartbeat.init[`proctype`procname!(`rdb;`rdb1);deps],1,1,initialise with identity config and mocks + +comment,,,,,,,publishheartbeat publishes a row and bumps the counter +run,0,0,q,.test.pubcount:0,1,1,reset publish counter +run,0,0,q,heartbeat.publishheartbeat[],1,1,publish a heartbeat +true,0,0,q,1=.test.pubcount,1,1,publish dependency was called once +true,0,0,q,`heartbeat~.test.pubt,1,1,published to the heartbeat table +true,0,0,q,`rdb~(first .test.pubx)`sym,1,1,published row carries the process type +true,0,0,q,1=.m.di.0heartbeat.hbcounter,1,1,counter incremented + +comment,,,,,,,disabled publishing is a no-op +run,0,0,q,.m.di.0heartbeat.enabled:0b,1,1,disable heartbeating +run,0,0,q,.test.pubcount:0,1,1,reset publish counter +run,0,0,q,heartbeat.publishheartbeat[],1,1,attempt to publish while disabled +true,0,0,q,0=.test.pubcount,1,1,nothing published when disabled +run,0,0,q,.m.di.0heartbeat.enabled:1b,1,1,re-enable heartbeating + +comment,,,,,,,storeheartbeat records incoming beats with cleared state +run,0,0,q,.m.di.0heartbeat.hb:0#.m.di.0heartbeat.hb,1,1,clear the store +run,0,0,q,.test.batch:([]time:enlist .test.now;sym:enlist`rdb;procname:enlist`rdb1;counter:enlist 1;pid:enlist 1i;host:enlist`h;port:enlist 5000i),1,1,build an incoming heartbeat +run,0,0,q,heartbeat.storeheartbeat[.test.batch],1,1,store the heartbeat +true,0,0,q,1=count heartbeat.gethb[],1,1,one process recorded +true,0,0,q,not first exec warning from heartbeat.gethb[],1,1,stored beat has warning cleared + +comment,,,,,,,addprocs seeds an expected process +run,0,0,q,heartbeat.addprocs[`rdb;`rdb2],1,1,seed an expected process +true,0,0,q,`rdb2 in exec procname from heartbeat.gethb[],1,1,seeded process present + +comment,,,,,,,checkheartbeat flags warning then error as time passes +run,0,0,q,.m.di.0heartbeat.hb:0#.m.di.0heartbeat.hb,1,1,clear the store +run,0,0,q,.test.now:2025.01.01D00:00:00.000,1,1,reset time to t0 +run,0,0,q,heartbeat.addprocs[`rdb;`stale],1,1,seed a process that will go stale +run,0,0,q,.test.now:2025.01.01D00:00:50.000,1,1,advance 50s past the 45s warning period +run,0,0,q,heartbeat.checkheartbeat[],1,1,run the check +true,0,0,q,first exec warning from heartbeat.gethb[] where procname=`stale,1,1,process moved to warning +true,0,0,q,not first exec error from heartbeat.gethb[] where procname=`stale,1,1,not yet in error +run,0,0,q,.test.now:2025.01.01D00:02:00.000,1,1,advance past the 60s error period +run,0,0,q,heartbeat.checkheartbeat[],1,1,run the check again +true,0,0,q,first exec error from heartbeat.gethb[] where procname=`stale,1,1,process moved to error + +comment,,,,,,,subscribe tracks handles and closeconnection removes them +run,0,0,q,.m.di.0heartbeat.subscribedhandles:`int$(),1,1,clear tracked handles +run,0,0,q,heartbeat.subscribe[5i],1,1,subscribe to a remote handle +true,0,0,q,5i in .m.di.0heartbeat.subscribedhandles,1,1,handle tracked after subscribe +true,0,0,q,5i~.test.subh,1,1,subscribe dependency was invoked with the handle +run,0,0,q,.m.di.0heartbeat.closeconnection[5i],1,1,simulate connection close +true,0,0,q,not 5i in .m.di.0heartbeat.subscribedhandles,1,1,handle removed on close From 8e5d41d34b4f2ec0af1fa240711e6a3cc2de3dd7 Mon Sep 17 00:00:00 2001 From: Olly99999 Date: Tue, 9 Jun 2026 11:57:59 +0100 Subject: [PATCH 4/6] Changed structure to follow the dependency guide --- di/heartbeat/heartbeat.md | 50 ++++++++----- di/heartbeat/heartbeat.q | 144 +++++++++++++++++++------------------- di/heartbeat/test.csv | 6 ++ 3 files changed, 110 insertions(+), 90 deletions(-) diff --git a/di/heartbeat/heartbeat.md b/di/heartbeat/heartbeat.md index 251cd895..b4be2b5d 100644 --- a/di/heartbeat/heartbeat.md +++ b/di/heartbeat/heartbeat.md @@ -13,21 +13,28 @@ It covers both sides: ## Dependencies -All runtime dependencies are **injected** via `init` as dictionaries of functions, -so the module has no hard dependencies on any other module and runs standalone with -minimal built-in fallbacks (logging to stdout, no-op timer/handlers/pubsub). Inject -real implementations to get full functionality. - -| Dependency | Keys | Default fallback | Purpose | -|------------|------|------------------|---------| -| `log` | `info` `warn` `error` (each `{[ctx;msg]}`) | writes to stdout | logging | -| `timer` | `addjob` `deletejobs` `enablejobs` `disablejobs` `getactivejobs` `cp` | no-op (`cp` returns `.z.p`) | scheduling publish/check/subscribe and the current-time source | -| `handlers` | `register` `remove` `list` | no-op | registering the connection-close (`.z.pc`) cleanup | -| `pubsub` | `publish` (`{[table;data]}`) `subscribe` (`{[handle]}`) | no-op | publishing heartbeats / subscribing to publishers | -| `servers` | `getservers` (`{[proctype]}` returning handles) | returns empty | discovering heartbeat publishers by process type | +All runtime dependencies are **injected** via `init` as dictionaries of functions +(`` `dependency!(dict of functions) ``). They are **required** - `init` errors +immediately with a clear message if a required dependency is missing. There is no +hard dependency on any other module: any module exporting the contracted function +signatures can be supplied. + +| Dependency | Keys | Required | Purpose | +|------------|------|----------|---------| +| `log` | `info` `warn` `error` (each `{[ctx;msg]}`) | always | logging | +| `timer` | `addjob` `deletejobs` `enablejobs` `disablejobs` `getactivejobs` `cp` | always | scheduling publish/check/subscribe and the current-time source (`cp`) | +| `pubsub` | `publish` (`{[table;data]}`) `subscribe` (`{[handle]}`) | always | publishing heartbeats / subscribing to publishers | +| `servers` | `getservers` (`{[proctype]}` returning handles) | when `subenabled` | discovering heartbeat publishers by process type | +| `handlers` | `register` `remove` `list` | when `subenabled` | registering the connection-close (`.z.pc`) cleanup | `log`, `timer` and `handlers` follow the standard kdb-x core dependency contracts; -`pubsub` and `servers` are heartbeat-specific. +`pubsub` and `servers` are heartbeat-specific. `servers` and `handlers` are only +required when `subenabled` is set (i.e. this process monitors other heartbeats); +a pure publisher needs only `log`, `timer` and `pubsub`. + +Only the functions the module actually calls are accessed (`timer`'s `addjob`/`cp`, +`handlers`' `register`), but supplying the full contracted dictionary keeps the +dependency interchangeable with the real `di.*` modules. ## Configuration @@ -85,18 +92,23 @@ recognised key may be supplied; unset keys keep their defaults. // load the module heartbeat: use `di.heartbeat -// build dependency dictionaries (here using di.log and di.timer) -log: use `di.log -log.init[()!()] -logdep: `info`warn`error!(log.info;log.warn;log.error) +// build dependency dictionaries (here from di.log and di.timer) +// note: bound as logmod, not log, since log is a reserved q word +logmod: use `di.log +logmod.init[()!()] +logdep: `info`warn`error!(logmod.info;logmod.warn;logmod.error) timer: use `di.timer timer.init[()!()] timerdep: `addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!( timer.addjob;timer.deletejobs;timer.enablejobs;timer.disablejobs;timer.getactivejobs;timer.cp) -// initialise as a publishing RDB -heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer!(logdep;timerdep)] +// a pubsub dependency must provide publish[table;data] and subscribe[handle] +pubsub: use `di.pubsub +psdep: `publish`subscribe!(pubsub.publish; {[h] h(`.m.di.0pubsub.subscribe;`heartbeat;`)}) + +// initialise as a publishing RDB (log, timer and pubsub are all required) +heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)] // publish a heartbeat immediately (normally the timer does this) heartbeat.publishheartbeat[] diff --git a/di/heartbeat/heartbeat.q b/di/heartbeat/heartbeat.q index 6a09e051..3b8c4709 100644 --- a/di/heartbeat/heartbeat.q +++ b/di/heartbeat/heartbeat.q @@ -2,7 +2,12 @@ / every process can publish a periodic heartbeat over pub/sub so that downstream / monitors can detect when a process has stopped beating - i.e. it is stalled or / blocked - even when the underlying connection is still valid -/ module-local state convention: read via .z.M, mutate via .z.m +/ the module handles both publishing heartbeats and, on the monitoring side, +/ storing received heartbeats and raising warnings / errors when they stop +/ runtime dependencies are injected via init and are required - the module errors +/ immediately if a required dependency is missing - see heartbeat.md +/ module-local state convention: read config bare, mutate via .z.m, and access +/ injected dependencies via .z.m at every call site / table used to publish heartbeats - sym holds the publishing process type heartbeat:([] time:`timestamp$(); sym:`symbol$(); procname:`symbol$(); counter:`long$(); pid:`int$(); host:`symbol$(); port:`int$()); @@ -36,52 +41,43 @@ onerror:{[procs]}; / callback fired with the rows entering error state / recognised configuration keys - anything else passed via config is ignored configkeys:`enabled`subenabled`debug`publishinterval`checkinterval`warningtolerance`errortolerance`proctype`procname`pid`host`port`connections`onwarning`onerror; -/ minimal default dependencies so the module works standalone (with less functionality) -/ injected real implementations replace these in init, matched key-for-key -defaultlog:`info`warn`error!( - {[ctx;msg] -1 "INFO ",(string ctx),": ",msg;}; - {[ctx;msg] -1 "WARN ",(string ctx),": ",msg;}; - {[ctx;msg] -2 "ERROR ",(string ctx),": ",msg;}); - -defaulttimer:`addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!( - {[id;func;params;period;mode;opts]}; - {[ids]}; - {[ids]}; - {[ids]}; - {[] 0#`id`func`params`period!(`$();();();`int$())}; - {[] .z.p}); - -defaulthandlers:`register`remove`list!( - {[event;name;func]}; - {[event;name]}; - {[event] ()}); - -defaultpubsub:`publish`subscribe!( - {[tbl;data]}; - {[handle]}); - -defaultservers:enlist[`getservers]!enlist {[proctype] `int$()}; +/ warning / error grace periods - vary by process type if required +warningperiod:{[processtype] `timespan$warningtolerance*publishinterval}; +errorperiod:{[processtype] `timespan$errortolerance*publishinterval}; -extractdep:{[deps;name] - / pull a named dependency dictionary out of deps, empty dict when absent or null - $[(name in key deps) and not (::)~deps name;deps name;()!()] +requiredep:{[deps;name] + / extract a required dependency dictionary, erroring immediately if absent or null + d:$[99h=type deps;$[(name in key deps) and not (::)~deps name;deps name;()!()];()!()]; + if[not count d; + '"di.heartbeat: ",(string name)," dependency is required; pass it via init deps - see di.",string name]; + d }; setdeps:{[deps] - / store injected dependencies, merging over the minimal defaults per key - if[not 99h=type deps;deps:()!()]; - .z.m.log:defaultlog,extractdep[deps;`log]; - .z.m.timer:defaulttimer,extractdep[deps;`timer]; - .z.m.handlers:defaulthandlers,extractdep[deps;`handlers]; - .z.m.pubsub:defaultpubsub,extractdep[deps;`pubsub]; - .z.m.servers:defaultservers,extractdep[deps;`servers]; + / extract and store the required dependencies, flattened for access via .z.m + / log, timer and pubsub are always required; servers and handlers only when monitoring + logdict:requiredep[deps;`log]; + .z.m.loginfo:logdict`info; + .z.m.logwarn:logdict`warn; + .z.m.logerr:logdict`error; + timerdict:requiredep[deps;`timer]; + .z.m.timeraddjob:timerdict`addjob; + .z.m.timercp:timerdict`cp; + pubsubdict:requiredep[deps;`pubsub]; + .z.m.pubsubpublish:pubsubdict`publish; + .z.m.pubsubsubscribe:pubsubdict`subscribe; + if[subenabled; + serversdict:requiredep[deps;`servers]; + .z.m.serversgetservers:serversdict`getservers; + handlersdict:requiredep[deps;`handlers]; + .z.m.handlersregister:handlersdict`register]; }; setconfig:{[config] / apply recognised configuration overrides (a dictionary) on top of current values - if[not 99h=type config;:()]; - ks:configkeys inter key config; - {[k;v] (` sv `.z.m,k) set v}'[ks;config ks]; + cfg:$[99h=type config;config;()!()]; + ks:configkeys inter key cfg; + (.Q.dd[.z.M] each ks) set' cfg ks; }; tosecs:{[span] @@ -92,83 +88,89 @@ tosecs:{[span] registertimers:{ / schedule the periodic heartbeat jobs via the injected timer (mode 1 = fixed interval) if[enabled; - .z.M.timer[`addjob][`hbpublish;publishheartbeat;();tosecs publishinterval;1;()!()]; - .z.M.timer[`addjob][`hbcheck;checkheartbeat;();tosecs checkinterval;1;()!()]]; + .z.m.timeraddjob[`hbpublish;publishheartbeat;();tosecs publishinterval;1;()!()]; + .z.m.timeraddjob[`hbcheck;checkheartbeat;();tosecs checkinterval;1;()!()]]; if[subenabled; - .z.M.timer[`addjob][`hbsubscribe;hbsubscriptions;();60;1;()!()]]; + .z.m.timeraddjob[`hbsubscribe;hbsubscriptions;();60;1;()!()]]; }; registerhandlers:{ / wire the connection-close cleanup through the injected handler manager if[subenabled; - .z.M.handlers[`register][`.z.pc;`heartbeat;closeconnection]]; + .z.m.handlersregister[`.z.pc;`heartbeat;closeconnection]]; }; init:{[config;deps] / initialise the module with configuration and injected dependencies - see heartbeat.md / config - dictionary of configuration overrides (see configkeys), or (::) for defaults - / deps - dictionary of injected dependencies keyed by name, each a dictionary of functions - setdeps deps; + / deps - dictionary of injected dependencies keyed by name, each a dictionary of functions: + / `log - `info`warn`error - required + / `timer - `addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp - required + / `pubsub - `publish`subscribe - required + / `servers - `getservers - required when subenabled + / `handlers - `register`remove`list - required when subenabled + / example: + / heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)] setconfig config; + setdeps deps; registertimers[]; registerhandlers[]; - .z.M.log[`info][`heartbeat;"di.heartbeat initialised"]; + .z.m.loginfo[`heartbeat;"di.heartbeat initialised"]; }; publishheartbeat:{ / publish a single heartbeat row over pub/sub and bump the counter if[not enabled;:()]; - .z.M.pubsub[`publish][`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(.z.M.timer[`cp][];proctype;procname;.z.M.hbcounter;pid;host;port)]; - .z.m.hbcounter:.z.M.hbcounter+1; + .z.m.pubsubpublish[`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(.z.m.timercp[];proctype;procname;hbcounter;pid;host;port)]; + .z.m.hbcounter:hbcounter+1; }; storeheartbeat:{[batch] / store one or more incoming heartbeats, keeping the latest per process and / clearing warning / error state - call this from upd when a heartbeat arrives - .z.m.hb:.z.M.hb upsert update warning:0b,error:0b from select by sym,procname from batch; + .z.m.hb:hb upsert update warning:0b,error:0b from select by sym,procname from batch; }; addprocs:{[proctypes;procnames] / seed the store with expected processes so a never-seen process is flagged / real heartbeats arriving later override these seeded rows - seed:2!([]sym:proctypes,();procname:procnames,();time:.z.M.timer[`cp][];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b); - .z.m.hb:seed,.z.M.hb; + seed:2!([]sym:proctypes,();procname:procnames,();time:.z.m.timercp[];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b); + .z.m.hb:seed,hb; }; logwarnproc:{[r] / log a single process moving into warning state - .z.M.log[`warn][`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; + .z.m.logwarn[`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; }; logerrproc:{[r] / log a single process moving into error state - .z.M.log[`error][`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; + .z.m.logerr[`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time]; }; warn:{[procs] / move processes into warning state, log and fire the warning callback if[debug;logwarnproc each 0!procs]; - .z.m.hb:.z.M.hb upsert select sym,procname,warning:1b from procs; - .z.M.onwarning procs; + .z.m.hb:hb upsert select sym,procname,warning:1b from procs; + onwarning procs; }; err:{[procs] / move processes into error state, log and fire the error callback if[debug;logerrproc each 0!procs]; - .z.m.hb:.z.M.hb upsert select sym,procname,error:1b from procs; - .z.M.onerror procs; + .z.m.hb:hb upsert select sym,procname,error:1b from procs; + onerror procs; }; -warningperiod:{[processtype] `timespan$warningtolerance*publishinterval}; -errorperiod:{[processtype] `timespan$errortolerance*publishinterval}; - checkheartbeat:{ / flag processes that have not heartbeated within the warning / error grace periods / status: 0 healthy, 1 warning, 2+ error - now:.z.M.timer[`cp][]; - stats:0!update - status:(`short$now>time+warningperiod each sym)+`short$2*now>time+errorperiod each sym - from .z.M.hb; + / grace periods are computed as locals first - module functions do not resolve inside qsql + now:.z.m.timercp[]; + t:0!hb; + wp:warningperiod each t`sym; + ep:errorperiod each t`sym; + stats:update status:(`short$now>time+wp)+`short$2*now>time+ep from t; newwarn:select sym,procname,time from stats where status=1,not warning; newerr:select sym,procname,time from stats where status>1,not error; if[count newwarn;warn newwarn]; @@ -177,8 +179,8 @@ checkheartbeat:{ subscribeone:{[h] / subscribe to a single remote heartbeat publisher, logging and skipping on failure - ok:.[{.z.M.pubsub[`subscribe] x;1b};h;{[h;e] .z.M.log[`error][`heartbeat;"failed to subscribe to heartbeats on handle ",(string h),": ",e];0b}[h]]; - if[ok;.z.m.subscribedhandles:distinct .z.M.subscribedhandles,h]; + ok:@[{.z.m.pubsubsubscribe x;1b};h;{[h;e] .z.m.logerr[`heartbeat;"failed to subscribe to heartbeats on handle ",(string h),": ",e];0b}[h]]; + if[ok;.z.m.subscribedhandles:distinct subscribedhandles,h]; }; subscribe:{[handles] @@ -188,9 +190,9 @@ subscribe:{[handles] getheartbeats:{[proctype] / subscribe to publishers of the given process type(s) that are not yet subscribed - handles:(.z.M.servers[`getservers] proctype) except .z.M.subscribedhandles; + handles:(.z.m.serversgetservers proctype) except subscribedhandles; if[count handles; - .z.M.log[`info][`heartbeat;"subscribing to new heartbeat handle(s) ",", " sv string handles]; + .z.m.loginfo[`heartbeat;"subscribing to new heartbeat handle(s) ",", " sv string handles]; subscribe handles]; }; @@ -201,10 +203,10 @@ hbsubscriptions:{ closeconnection:{[h] / drop a closed handle from the tracked subscriptions - registered against .z.pc - .z.m.subscribedhandles:.z.M.subscribedhandles except h; + .z.m.subscribedhandles:subscribedhandles except h; }; gethb:{ / return the current heartbeat store for inspection - .z.M.hb - }; \ No newline at end of file + hb + }; diff --git a/di/heartbeat/test.csv b/di/heartbeat/test.csv index 0478e8a9..38ea513a 100644 --- a/di/heartbeat/test.csv +++ b/di/heartbeat/test.csv @@ -54,3 +54,9 @@ true,0,0,q,5i in .m.di.0heartbeat.subscribedhandles,1,1,handle tracked after sub true,0,0,q,5i~.test.subh,1,1,subscribe dependency was invoked with the handle run,0,0,q,.m.di.0heartbeat.closeconnection[5i],1,1,simulate connection close true,0,0,q,not 5i in .m.di.0heartbeat.subscribedhandles,1,1,handle removed on close + +comment,,,,,,,init errors immediately when a required dependency is missing +fail,0,0,q,heartbeat.init[()!();()!()],1,1,errors when log dependency missing +fail,0,0,q,heartbeat.init[()!();enlist[`log]!enlist logdep],1,1,errors when timer dependency missing +fail,0,0,q,heartbeat.init[()!();`log`timer!(logdep;timerdep)],1,1,errors when pubsub dependency missing +fail,0,0,q,heartbeat.init[enlist[`subenabled]!enlist 1b;`log`timer`pubsub!(logdep;timerdep;psdep)],1,1,monitor errors when servers dependency missing From 4af597939c1940d769a304eca71ec155107a6f82 Mon Sep 17 00:00:00 2001 From: Olly99999 Date: Fri, 12 Jun 2026 14:07:29 +0100 Subject: [PATCH 5/6] Made changes inline with message about dependencies --- di/heartbeat/heartbeat.md | 14 ++++++++++---- di/heartbeat/heartbeat.q | 26 +++++++++++++++++--------- di/heartbeat/init.q | 2 +- di/heartbeat/test.csv | 3 ++- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/di/heartbeat/heartbeat.md b/di/heartbeat/heartbeat.md index b4be2b5d..f623aeae 100644 --- a/di/heartbeat/heartbeat.md +++ b/di/heartbeat/heartbeat.md @@ -22,7 +22,7 @@ signatures can be supplied. | Dependency | Keys | Required | Purpose | |------------|------|----------|---------| | `log` | `info` `warn` `error` (each `{[ctx;msg]}`) | always | logging | -| `timer` | `addjob` `deletejobs` `enablejobs` `disablejobs` `getactivejobs` `cp` | always | scheduling publish/check/subscribe and the current-time source (`cp`) | +| `timer` | `addjob` (the full `di.timer` dict may be passed) | always | scheduling the publish / check / subscribe jobs | | `pubsub` | `publish` (`{[table;data]}`) `subscribe` (`{[handle]}`) | always | publishing heartbeats / subscribing to publishers | | `servers` | `getservers` (`{[proctype]}` returning handles) | when `subenabled` | discovering heartbeat publishers by process type | | `handlers` | `register` `remove` `list` | when `subenabled` | registering the connection-close (`.z.pc`) cleanup | @@ -32,10 +32,15 @@ signatures can be supplied. required when `subenabled` is set (i.e. this process monitors other heartbeats); a pure publisher needs only `log`, `timer` and `pubsub`. -Only the functions the module actually calls are accessed (`timer`'s `addjob`/`cp`, +Only the functions the module actually calls are accessed (`timer`'s `addjob`, `handlers`' `register`), but supplying the full contracted dictionary keeps the dependency interchangeable with the real `di.*` modules. +The module keeps its **own** current-time function rather than taking it from the +timer dependency (so it doesn't rely on the timer exporting a clock getter). It +defaults to `.z.p`; override it with `setcp` for deterministic tests or simulation, +e.g. `heartbeat.setcp[{2025.01.01D00:00:00.000}]`. + ## Configuration `init[config;deps]` takes a configuration dictionary as its first argument. Any @@ -69,6 +74,7 @@ recognised key may be supplied; unset keys keep their defaults. | `subscribe[handles]` | subscribe to heartbeats on the given remote handle(s) | | `hbsubscriptions[]` | subscribe to all configured publishers (by `connections` process type) | | `gethb[]` | return the heartbeat store | +| `setcp[f]` | replace the current-time function (for tests / simulation) | ## Heartbeat store schema @@ -100,8 +106,8 @@ logdep: `info`warn`error!(logmod.info;logmod.warn;logmod.error) timer: use `di.timer timer.init[()!()] -timerdep: `addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!( - timer.addjob;timer.deletejobs;timer.enablejobs;timer.disablejobs;timer.getactivejobs;timer.cp) +// heartbeat only needs addjob from the timer - it keeps its own clock (see setcp) +timerdep: enlist[`addjob]!enlist timer.addjob // a pubsub dependency must provide publish[table;data] and subscribe[handle] pubsub: use `di.pubsub diff --git a/di/heartbeat/heartbeat.q b/di/heartbeat/heartbeat.q index 3b8c4709..ea83bd97 100644 --- a/di/heartbeat/heartbeat.q +++ b/di/heartbeat/heartbeat.q @@ -21,6 +21,14 @@ subscribedhandles:`int$(); / heartbeat counter hbcounter:0; +/ current-time function - heartbeat owns its clock; override via setcp for testing / simulation +cp:{.z.p}; + +setcp:{[f] + / replace the current-time function (used by tests and simulation) + .z.m.cp:f; + }; + / configuration defaults - overridden by the config dictionary passed to init enabled:1b; / whether heartbeat publishing / checking is enabled subenabled:0b; / whether this process monitors (subscribes to) other heartbeats @@ -62,7 +70,6 @@ setdeps:{[deps] .z.m.logerr:logdict`error; timerdict:requiredep[deps;`timer]; .z.m.timeraddjob:timerdict`addjob; - .z.m.timercp:timerdict`cp; pubsubdict:requiredep[deps;`pubsub]; .z.m.pubsubpublish:pubsubdict`publish; .z.m.pubsubsubscribe:pubsubdict`subscribe; @@ -104,11 +111,12 @@ init:{[config;deps] / initialise the module with configuration and injected dependencies - see heartbeat.md / config - dictionary of configuration overrides (see configkeys), or (::) for defaults / deps - dictionary of injected dependencies keyed by name, each a dictionary of functions: - / `log - `info`warn`error - required - / `timer - `addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp - required - / `pubsub - `publish`subscribe - required - / `servers - `getservers - required when subenabled - / `handlers - `register`remove`list - required when subenabled + / `log - `info`warn`error - required + / `timer - `addjob (full di.timer dict may be passed) - required + / `pubsub - `publish`subscribe - required + / `servers - `getservers - required when subenabled + / `handlers - `register`remove`list - required when subenabled + / note: the module keeps its own clock (cp, default .z.p) - override via setcp / example: / heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)] setconfig config; @@ -121,7 +129,7 @@ init:{[config;deps] publishheartbeat:{ / publish a single heartbeat row over pub/sub and bump the counter if[not enabled;:()]; - .z.m.pubsubpublish[`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(.z.m.timercp[];proctype;procname;hbcounter;pid;host;port)]; + .z.m.pubsubpublish[`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(cp[];proctype;procname;hbcounter;pid;host;port)]; .z.m.hbcounter:hbcounter+1; }; @@ -134,7 +142,7 @@ storeheartbeat:{[batch] addprocs:{[proctypes;procnames] / seed the store with expected processes so a never-seen process is flagged / real heartbeats arriving later override these seeded rows - seed:2!([]sym:proctypes,();procname:procnames,();time:.z.m.timercp[];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b); + seed:2!([]sym:proctypes,();procname:procnames,();time:cp[];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b); .z.m.hb:seed,hb; }; @@ -166,7 +174,7 @@ checkheartbeat:{ / flag processes that have not heartbeated within the warning / error grace periods / status: 0 healthy, 1 warning, 2+ error / grace periods are computed as locals first - module functions do not resolve inside qsql - now:.z.m.timercp[]; + now:cp[]; t:0!hb; wp:warningperiod each t`sym; ep:errorperiod each t`sym; diff --git a/di/heartbeat/init.q b/di/heartbeat/init.q index 77ff3d0e..22294939 100644 --- a/di/heartbeat/init.q +++ b/di/heartbeat/init.q @@ -5,4 +5,4 @@ version:"0.1.0"; / public api - only the functions intended to be called externally are exported -export:([init;publishheartbeat;checkheartbeat;storeheartbeat;addprocs;subscribe;hbsubscriptions;gethb;version]) +export:([init;publishheartbeat;checkheartbeat;storeheartbeat;addprocs;subscribe;hbsubscriptions;gethb;setcp;version]) diff --git a/di/heartbeat/test.csv b/di/heartbeat/test.csv index 38ea513a..351fb78e 100644 --- a/di/heartbeat/test.csv +++ b/di/heartbeat/test.csv @@ -3,11 +3,12 @@ comment,,,,,,,Setup - load module and inject mock dependencies before,0,0,q,heartbeat:use`di.heartbeat,1,1,load the heartbeat module before,0,0,q,logdep:`info`warn`error!({[c;m]};{[c;m]};{[c;m]}),1,1,silent log mock before,0,0,q,.test.now:2025.01.01D00:00:00.000,1,1,controllable current time for the timer mock -before,0,0,q,timerdep:`addjob`deletejobs`enablejobs`disablejobs`getactivejobs`cp!({[id;func;params;period;mode;opts]};{[ids]};{[ids]};{[ids]};{[]([]id:`$())};{[].test.now}),1,1,timer mock with controllable cp +before,0,0,q,timerdep:enlist[`addjob]!enlist {[id;func;params;period;mode;opts]},1,1,timer mock - heartbeat only requires addjob before,0,0,q,psdep:`publish`subscribe!({[t;x] .test.pubt:t;.test.pubx:x;.test.pubcount:.test.pubcount+1};{[h] .test.subh:h}),1,1,pubsub mock capturing publish and subscribe before,0,0,q,.test.pubcount:0,1,1,initialise publish counter before,0,0,q,deps:`log`timer`pubsub!(logdep;timerdep;psdep),1,1,assemble dependency dictionary before,0,0,q,heartbeat.init[`proctype`procname!(`rdb;`rdb1);deps],1,1,initialise with identity config and mocks +before,0,0,q,heartbeat.setcp[{[].test.now}],1,1,point heartbeat clock at the controllable test time comment,,,,,,,publishheartbeat publishes a row and bumps the counter run,0,0,q,.test.pubcount:0,1,1,reset publish counter From 76099fa1afab1a83406ae70e3b9a478f8b68caec Mon Sep 17 00:00:00 2001 From: Olly99999 Date: Fri, 12 Jun 2026 16:20:37 +0100 Subject: [PATCH 6/6] Made some changes.Use timer mode 2 to avoid catch-up storms, fix timer dep example found during manual test with a real di.timer --- di/heartbeat/heartbeat.md | 3 ++- di/heartbeat/heartbeat.q | 10 ++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/di/heartbeat/heartbeat.md b/di/heartbeat/heartbeat.md index f623aeae..5708c6b6 100644 --- a/di/heartbeat/heartbeat.md +++ b/di/heartbeat/heartbeat.md @@ -107,7 +107,8 @@ logdep: `info`warn`error!(logmod.info;logmod.warn;logmod.error) timer: use `di.timer timer.init[()!()] // heartbeat only needs addjob from the timer - it keeps its own clock (see setcp) -timerdep: enlist[`addjob]!enlist timer.addjob +// note: di.timer's addjob is a namespace; addjob.custom has the [id;func;params;period;mode;opts] signature heartbeat calls +timerdep: enlist[`addjob]!enlist timer.addjob.custom // a pubsub dependency must provide publish[table;data] and subscribe[handle] pubsub: use `di.pubsub diff --git a/di/heartbeat/heartbeat.q b/di/heartbeat/heartbeat.q index ea83bd97..095eb470 100644 --- a/di/heartbeat/heartbeat.q +++ b/di/heartbeat/heartbeat.q @@ -93,12 +93,14 @@ tosecs:{[span] }; registertimers:{ - / schedule the periodic heartbeat jobs via the injected timer (mode 1 = fixed interval) + / schedule the periodic heartbeat jobs via the injected timer + / mode 2 = period after previous actual start - a heartbeat says "alive now", so + / missed beats must not be replayed as a catch-up storm (which mode 1 would do) if[enabled; - .z.m.timeraddjob[`hbpublish;publishheartbeat;();tosecs publishinterval;1;()!()]; - .z.m.timeraddjob[`hbcheck;checkheartbeat;();tosecs checkinterval;1;()!()]]; + .z.m.timeraddjob[`hbpublish;publishheartbeat;();tosecs publishinterval;2;()!()]; + .z.m.timeraddjob[`hbcheck;checkheartbeat;();tosecs checkinterval;2;()!()]]; if[subenabled; - .z.m.timeraddjob[`hbsubscribe;hbsubscriptions;();60;1;()!()]]; + .z.m.timeraddjob[`hbsubscribe;hbsubscriptions;();60;2;()!()]]; }; registerhandlers:{