Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions di/heartbeat/deps.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/ hard module dependencies and their minimum versions, validated by di.depcheck
/ di.heartbeat has no hard dependencies - all runtime dependencies (log, timer,
/ handlers, pubsub, servers) are injected via init as dictionaries of functions
deps:(`$())!();
130 changes: 130 additions & 0 deletions di/heartbeat/heartbeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Heartbeat

This module lets every process publish a periodic heartbeat over pub/sub, and lets
monitoring processes detect when a process has stopped beating - i.e. it is stalled
or blocked - even when the underlying connection is still valid.

It covers both sides:

* **Publishing** - a process periodically publishes a heartbeat row over pub/sub.
* **Monitoring** - a process subscribes to other processes' heartbeats, stores the
latest beat per process, and raises a *warning* then an *error* when a process
stops heartbeating within the configured grace periods.

## Dependencies

All runtime dependencies are **injected** via `init` as dictionaries of functions
(`` `dependency!(dict of functions) ``). They are **required** - `init` errors
immediately with a clear message if a required dependency is missing. There is no
hard dependency on any other module: any module exporting the contracted function
signatures can be supplied.

| Dependency | Keys | Required | Purpose |
|------------|------|----------|---------|
| `log` | `info` `warn` `error` (each `{[ctx;msg]}`) | always | logging |
| `timer` | `addjob` (the full `di.timer` dict may be passed) | always | scheduling the publish / check / subscribe jobs |
| `pubsub` | `publish` (`{[table;data]}`) `subscribe` (`{[handle]}`) | always | publishing heartbeats / subscribing to publishers |
| `servers` | `getservers` (`{[proctype]}` returning handles) | when `subenabled` | discovering heartbeat publishers by process type |
| `handlers` | `register` `remove` `list` | when `subenabled` | registering the connection-close (`.z.pc`) cleanup |

`log`, `timer` and `handlers` follow the standard kdb-x core dependency contracts;
`pubsub` and `servers` are heartbeat-specific. `servers` and `handlers` are only
required when `subenabled` is set (i.e. this process monitors other heartbeats);
a pure publisher needs only `log`, `timer` and `pubsub`.

Only the functions the module actually calls are accessed (`timer`'s `addjob`,
`handlers`' `register`), but supplying the full contracted dictionary keeps the
dependency interchangeable with the real `di.*` modules.

The module keeps its **own** current-time function rather than taking it from the
timer dependency (so it doesn't rely on the timer exporting a clock getter). It
defaults to `.z.p`; override it with `setcp` for deterministic tests or simulation,
e.g. `heartbeat.setcp[{2025.01.01D00:00:00.000}]`.

## Configuration

`init[config;deps]` takes a configuration dictionary as its first argument. Any
recognised key may be supplied; unset keys keep their defaults.

| Key | Default | Description |
|-----|---------|-------------|
| `enabled` | `1b` | publish and check heartbeats |
| `subenabled` | `0b` | act as a monitor: subscribe to other heartbeats and register the disconnect handler |
| `debug` | `1b` | log warning / error transitions |
| `publishinterval` | `0D00:00:30` | how often heartbeats are published |
| `checkinterval` | `0D00:00:10` | how often received heartbeats are checked |
| `warningtolerance` | `1.5` | warning after `warningtolerance*publishinterval` without a beat |
| `errortolerance` | `2f` | error after `errortolerance*publishinterval` without a beat |
| `proctype` | `` `unknown `` | this process's type (published as `sym`) |
| `procname` | `.z.h` | this process's name |
| `pid` `host` `port` | from `.z` | this process's identity |
| `connections` | `` `$() `` | process types this monitor subscribes to (used by `hbsubscriptions`) |
| `onwarning` | no-op | callback invoked with the rows entering warning state |
| `onerror` | no-op | callback invoked with the rows entering error state |

## Public API

| Function | Description |
|----------|-------------|
| `init[config;deps]` | wire dependencies and configuration, and schedule the timer jobs |
| `publishheartbeat[]` | publish a single heartbeat row and increment the counter |
| `checkheartbeat[]` | flag processes that have not heartbeated in time |
| `storeheartbeat[batch]` | store incoming heartbeat(s); call from `upd` on the monitor |
| `addprocs[proctypes;procnames]` | seed expected processes so a never-seen process is flagged |
| `subscribe[handles]` | subscribe to heartbeats on the given remote handle(s) |
| `hbsubscriptions[]` | subscribe to all configured publishers (by `connections` process type) |
| `gethb[]` | return the heartbeat store |
| `setcp[f]` | replace the current-time function (for tests / simulation) |

## Heartbeat store schema

`gethb[]` returns the store, keyed on `sym`,`procname`:

| Column | Type | Description |
|--------|------|-------------|
| sym | `symbol` | process type |
| procname | `symbol` | process name |
| time | `timestamp` | time of the last received heartbeat |
| counter | `long` | counter from the last heartbeat |
| pid | `int` | publisher process id |
| host | `symbol` | publisher host |
| port | `int` | publisher port |
| warning | `boolean` | process is in warning state |
| error | `boolean` | process is in error state |

## Example

```q
// load the module
heartbeat: use `di.heartbeat

// build dependency dictionaries (here from di.log and di.timer)
// note: bound as logmod, not log, since log is a reserved q word
logmod: use `di.log
logmod.init[()!()]
logdep: `info`warn`error!(logmod.info;logmod.warn;logmod.error)

timer: use `di.timer
timer.init[()!()]
// heartbeat only needs addjob from the timer - it keeps its own clock (see setcp)
// note: di.timer's addjob is a namespace; addjob.custom has the [id;func;params;period;mode;opts] signature heartbeat calls
timerdep: enlist[`addjob]!enlist timer.addjob.custom

// a pubsub dependency must provide publish[table;data] and subscribe[handle]
pubsub: use `di.pubsub
psdep: `publish`subscribe!(pubsub.publish; {[h] h(`.m.di.0pubsub.subscribe;`heartbeat;`)})

// initialise as a publishing RDB (log, timer and pubsub are all required)
heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)]

// publish a heartbeat immediately (normally the timer does this)
heartbeat.publishheartbeat[]
```

On the monitoring side, route received heartbeats into the store from `upd` and let
the scheduled `checkheartbeat` raise warnings / errors:

```q
upd: {[t;x] if[t~`heartbeat; heartbeat.storeheartbeat[x]]; }
heartbeat.gethb[]
```
222 changes: 222 additions & 0 deletions di/heartbeat/heartbeat.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/ heartbeat module for kdb-x
/ every process can publish a periodic heartbeat over pub/sub so that downstream
/ monitors can detect when a process has stopped beating - i.e. it is stalled or
/ blocked - even when the underlying connection is still valid
/ the module handles both publishing heartbeats and, on the monitoring side,
/ storing received heartbeats and raising warnings / errors when they stop
/ runtime dependencies are injected via init and are required - the module errors
/ immediately if a required dependency is missing - see heartbeat.md
/ module-local state convention: read config bare, mutate via .z.m, and access
/ injected dependencies via .z.m at every call site

/ table used to publish heartbeats - sym holds the publishing process type
heartbeat:([] time:`timestamp$(); sym:`symbol$(); procname:`symbol$(); counter:`long$(); pid:`int$(); host:`symbol$(); port:`int$());

/ keyed store of the latest received heartbeat per process, with warning / error state
hb:update warning:0b,error:0b from `sym`procname xkey heartbeat;

/ remote handles we have already subscribed to for heartbeats
subscribedhandles:`int$();

/ heartbeat counter
hbcounter:0;

/ current-time function - heartbeat owns its clock; override via setcp for testing / simulation
cp:{.z.p};

setcp:{[f]
/ replace the current-time function (used by tests and simulation)
.z.m.cp:f;
};

/ configuration defaults - overridden by the config dictionary passed to init
enabled:1b; / whether heartbeat publishing / checking is enabled
subenabled:0b; / whether this process monitors (subscribes to) other heartbeats
debug:1b; / whether to log warning / error transitions
publishinterval:0D00:00:30; / how often heartbeats are published
checkinterval:0D00:00:10; / how often received heartbeats are checked
warningtolerance:1.5; / warning after warningtolerance*publishinterval without a beat
errortolerance:2f; / error after errortolerance*publishinterval without a beat
proctype:`unknown; / this process's type (published as sym)
procname:.z.h; / this process's name
pid:.z.i; / this process's pid
host:.z.h; / this process's host
port:`int$system"p"; / this process's port
connections:`$(); / process types this monitor should subscribe to
onwarning:{[procs]}; / callback fired with the rows entering warning state
onerror:{[procs]}; / callback fired with the rows entering error state

/ recognised configuration keys - anything else passed via config is ignored
configkeys:`enabled`subenabled`debug`publishinterval`checkinterval`warningtolerance`errortolerance`proctype`procname`pid`host`port`connections`onwarning`onerror;

/ warning / error grace periods - vary by process type if required
warningperiod:{[processtype] `timespan$warningtolerance*publishinterval};
errorperiod:{[processtype] `timespan$errortolerance*publishinterval};

requiredep:{[deps;name]
/ extract a required dependency dictionary, erroring immediately if absent or null
d:$[99h=type deps;$[(name in key deps) and not (::)~deps name;deps name;()!()];()!()];
if[not count d;
'"di.heartbeat: ",(string name)," dependency is required; pass it via init deps - see di.",string name];
d
};

setdeps:{[deps]
/ extract and store the required dependencies, flattened for access via .z.m
/ log, timer and pubsub are always required; servers and handlers only when monitoring
logdict:requiredep[deps;`log];
.z.m.loginfo:logdict`info;
.z.m.logwarn:logdict`warn;
.z.m.logerr:logdict`error;
timerdict:requiredep[deps;`timer];
.z.m.timeraddjob:timerdict`addjob;
pubsubdict:requiredep[deps;`pubsub];
.z.m.pubsubpublish:pubsubdict`publish;
.z.m.pubsubsubscribe:pubsubdict`subscribe;
if[subenabled;
serversdict:requiredep[deps;`servers];
.z.m.serversgetservers:serversdict`getservers;
handlersdict:requiredep[deps;`handlers];
.z.m.handlersregister:handlersdict`register];
};

setconfig:{[config]
/ apply recognised configuration overrides (a dictionary) on top of current values
cfg:$[99h=type config;config;()!()];
ks:configkeys inter key cfg;
(.Q.dd[.z.M] each ks) set' cfg ks;
};

tosecs:{[span]
/ convert a timespan into whole seconds for the timer period
`int$span%0D00:00:01
};

registertimers:{
/ schedule the periodic heartbeat jobs via the injected timer
/ mode 2 = period after previous actual start - a heartbeat says "alive now", so
/ missed beats must not be replayed as a catch-up storm (which mode 1 would do)
if[enabled;
.z.m.timeraddjob[`hbpublish;publishheartbeat;();tosecs publishinterval;2;()!()];
.z.m.timeraddjob[`hbcheck;checkheartbeat;();tosecs checkinterval;2;()!()]];
if[subenabled;
.z.m.timeraddjob[`hbsubscribe;hbsubscriptions;();60;2;()!()]];
};

registerhandlers:{
/ wire the connection-close cleanup through the injected handler manager
if[subenabled;
.z.m.handlersregister[`.z.pc;`heartbeat;closeconnection]];
};

init:{[config;deps]
/ initialise the module with configuration and injected dependencies - see heartbeat.md
/ config - dictionary of configuration overrides (see configkeys), or (::) for defaults
/ deps - dictionary of injected dependencies keyed by name, each a dictionary of functions:
/ `log - `info`warn`error - required
/ `timer - `addjob (full di.timer dict may be passed) - required
/ `pubsub - `publish`subscribe - required
/ `servers - `getservers - required when subenabled
/ `handlers - `register`remove`list - required when subenabled
/ note: the module keeps its own clock (cp, default .z.p) - override via setcp
/ example:
/ heartbeat.init[`proctype`procname!(`rdb;`rdb1); `log`timer`pubsub!(logdep;timerdep;psdep)]
setconfig config;
setdeps deps;
registertimers[];
registerhandlers[];
.z.m.loginfo[`heartbeat;"di.heartbeat initialised"];
};

publishheartbeat:{
/ publish a single heartbeat row over pub/sub and bump the counter
if[not enabled;:()];
.z.m.pubsubpublish[`heartbeat;enlist `time`sym`procname`counter`pid`host`port!(cp[];proctype;procname;hbcounter;pid;host;port)];
.z.m.hbcounter:hbcounter+1;
};

storeheartbeat:{[batch]
/ store one or more incoming heartbeats, keeping the latest per process and
/ clearing warning / error state - call this from upd when a heartbeat arrives
.z.m.hb:hb upsert update warning:0b,error:0b from select by sym,procname from batch;
};

addprocs:{[proctypes;procnames]
/ seed the store with expected processes so a never-seen process is flagged
/ real heartbeats arriving later override these seeded rows
seed:2!([]sym:proctypes,();procname:procnames,();time:cp[];counter:0N;pid:0Ni;host:`;port:0Ni;warning:0b;error:0b);
.z.m.hb:seed,hb;
};

logwarnproc:{[r]
/ log a single process moving into warning state
.z.m.logwarn[`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time];
};

logerrproc:{[r]
/ log a single process moving into error state
.z.m.logerr[`heartbeat;"process ",(string r`procname)," (type ",(string r`sym),") has not heartbeated since ",string r`time];
};

warn:{[procs]
/ move processes into warning state, log and fire the warning callback
if[debug;logwarnproc each 0!procs];
.z.m.hb:hb upsert select sym,procname,warning:1b from procs;
onwarning procs;
};

err:{[procs]
/ move processes into error state, log and fire the error callback
if[debug;logerrproc each 0!procs];
.z.m.hb:hb upsert select sym,procname,error:1b from procs;
onerror procs;
};

checkheartbeat:{
/ flag processes that have not heartbeated within the warning / error grace periods
/ status: 0 healthy, 1 warning, 2+ error
/ grace periods are computed as locals first - module functions do not resolve inside qsql
now:cp[];
t:0!hb;
wp:warningperiod each t`sym;
ep:errorperiod each t`sym;
stats:update status:(`short$now>time+wp)+`short$2*now>time+ep from t;
newwarn:select sym,procname,time from stats where status=1,not warning;
newerr:select sym,procname,time from stats where status>1,not error;
if[count newwarn;warn newwarn];
if[count newerr;err newerr];
};

subscribeone:{[h]
/ subscribe to a single remote heartbeat publisher, logging and skipping on failure
ok:@[{.z.m.pubsubsubscribe x;1b};h;{[h;e] .z.m.logerr[`heartbeat;"failed to subscribe to heartbeats on handle ",(string h),": ",e];0b}[h]];
if[ok;.z.m.subscribedhandles:distinct subscribedhandles,h];
};

subscribe:{[handles]
/ subscribe to heartbeats on the given remote handle(s), tracking successful subscriptions
subscribeone each (),handles;
};

getheartbeats:{[proctype]
/ subscribe to publishers of the given process type(s) that are not yet subscribed
handles:(.z.m.serversgetservers proctype) except subscribedhandles;
if[count handles;
.z.m.loginfo[`heartbeat;"subscribing to new heartbeat handle(s) ",", " sv string handles];
subscribe handles];
};

hbsubscriptions:{
/ subscribe to all configured heartbeat publishers (by configured process type)
getheartbeats connections;
};

closeconnection:{[h]
/ drop a closed handle from the tracked subscriptions - registered against .z.pc
.z.m.subscribedhandles:subscribedhandles except h;
};

gethb:{
/ return the current heartbeat store for inspection
hb
};
8 changes: 8 additions & 0 deletions di/heartbeat/init.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/ load core functionality into the module
\l ::heartbeat.q

/ module version - compared against dependants' minimum requirements by di.depcheck
version:"0.1.0";

/ public api - only the functions intended to be called externally are exported
export:([init;publishheartbeat;checkheartbeat;storeheartbeat;addprocs;subscribe;hbsubscriptions;gethb;setcp;version])
Loading