diff --git a/di/kafka/init.q b/di/kafka/init.q new file mode 100644 index 00000000..eac93cc8 --- /dev/null +++ b/di/kafka/init.q @@ -0,0 +1,5 @@ +/ kafka consumer/producer interface - wraps kafkaq native library + +\l ::kafka.q + +export:([init;initconsumer;initproducer;cleanupconsumer;cleanupproducer;subscribe;publish;setkupd]) diff --git a/di/kafka/kafka.md b/di/kafka/kafka.md new file mode 100644 index 00000000..8c222f74 --- /dev/null +++ b/di/kafka/kafka.md @@ -0,0 +1,203 @@ +# di.kafka + +Kafka consumer and producer interface for kdb+ processes. Wraps the `kafkaq` native C shared library to provide consumer initialisation, producer initialisation, topic subscription, and message publishing. Incoming messages are delivered to a configurable `kupd` callback in the root namespace. + +--- + +## Features + +- Connect kdb+ processes to a Kafka broker as consumer, producer, or both +- Subscribe to topics and receive messages via a configurable callback function +- Publish byte vectors to topics with optional message keys +- Graceful disabled mode on unsupported platforms - all native functions are replaced with informative stubs +- Message handler (`kupd`) configurable at init or updated live via `setkupd` +- Compatible with any logging implementation via the injected `log` dependency + +--- + +## Dependencies + +**Native library:** `kafkaq.so` (Linux) or `kafkaq.dll` (Windows). Must be present on the host. Path is provided via the `libpath` config key or derived from the `KDBLIB` environment variable. + +**Injected:** `log` - required. Pass `info`, `warn`, and `error` log functions via the `deps` argument to `init`. See `di.log` or the Confluence documentation for the expected function signatures. + +--- + +## Initialisation + +```q +log:use`di.log +log.init[logconfig] +logdep:enlist[`log]!enlist `info`warn`error!(log.info;log.warn;log.error) + +kafka:use`di.kafka +kafka.init[`enabled`libpath!(1b;`$/opt/TorQ/lib/l64/kafkaq);logdep] +``` + +All config keys are optional. Passing `(::)` or `()!()` as config applies defaults. + +| Key | Type | Default | Description | +|---|---|---|---| +| `enabled` | boolean | `1b` on `l64`, `0b` on all other platforms | Whether to load the native library. On a non-`l64` host the module loads in disabled mode unless explicitly overridden. | +| `libpath` | symbol | derived from `KDBLIB` env var | Path to the kafkaq library **without** file extension. If `KDBLIB` is not set, provide this explicitly. | +| `kupd` | function | logs message via injected logger | Message handler called by the C library on Kafka message receipt. Signature: `{[key;bytes]}`. | + +The `log` dependency is **required**. `init` throws if it is absent, `(::)`, or missing any of `info`/`warn`/`error`. + +`init` must be called before any other function is used. It is safe to call multiple times. + +--- + +## Functions + +### `init` +```q +kafka.init[config;deps] +``` +Initialises the module. Loads the native library when `enabled:1b` and the library is found. Sets the global `kupd` callback for the C library. + +### `initconsumer` +```q +kafka.initconsumer[`localhost:9092;`fetch.wait.max.ms`fetch.error.backoff.ms!`5`5] +``` +Initialises a Kafka consumer and connects to the broker. + +### `initproducer` +```q +kafka.initproducer[`localhost:9092;`queue.buffering.max.ms`batch.num.messages!`5`1] +``` +Initialises a Kafka producer and connects to the broker. + +### `cleanupconsumer` +```q +kafka.cleanupconsumer[(::)] +``` +Disconnects and frees the consumer object, stopping the subscription thread. + +### `cleanupproducer` +```q +kafka.cleanupproducer[(::)] +``` +Disconnects and frees the producer object. + +### `subscribe` +```q +kafka.subscribe[`mytopic;0] +``` +Starts the subscription thread for a topic on a given partition. Messages are delivered to `kupd`. + +### `publish` +```q +kafka.publish[`mytopic;0;`;`byte$"hello world"] +kafka.publish[`mytopic;0;`mykey;`byte$"hello world"] +``` +Publishes a byte vector to a topic and partition with an optional symbol key. + +### `setkupd` +```q +kafka.setkupd[{[k;x] upd[`kafkadata;(enlist k;enlist x)]}] +``` +Updates the message handler after `init` without requiring a full reinitialisation. Also updates the global `kupd` in the root namespace if the module is enabled. + +`init` must be called before `setkupd`. + +> **Warning:** Avoid calling `setkupd` while a subscription is active. The Kafka C library delivers messages on a background thread and may invoke the old handler after the swap. + +--- + +## Usage example + +```q +log:use`di.log +log.init[logconfig] +logdep:enlist[`log]!enlist `info`warn`error!(log.info;log.warn;log.error) + +myhandler:{[k;x] + upd[`kafkadata;(enlist .z.p;enlist k;enlist "c"$x)]; + }; + +kafka:use`di.kafka +kafka.init[`enabled`libpath`kupd!(1b;`$/opt/TorQ/lib/l64/kafkaq;myhandler);logdep] + +kafka.initconsumer[`localhost:9092;()!()] +kafka.subscribe[`trades;0] + +kafka.initproducer[`localhost:9092;()!()] +kafka.publish[`trades;0;`;`byte$"test message"] + +kafka.cleanupconsumer[(::)] +kafka.cleanupproducer[(::)] +``` + +--- + +## Global side effect + +When `enabled:1b` and the library loads successfully, `init` sets `kupd` in the **root namespace**: + +```q +@[`.;`kupd;:;.z.m.kupd] +``` + +This is an unavoidable consequence of the native C library's design - kafkaq is compiled to call `kupd` by name in the root namespace. `setkupd` also updates the global when enabled. + +--- + +## Disabled mode + +When `enabled:0b` - either because the platform default resolved to `0b` or it was explicitly set - every native function (`initconsumer`, `initproducer`, `cleanupconsumer`, `cleanupproducer`, `subscribe`, `publish`) is a stub that throws: + +``` +'kafka not enabled +``` + +This is the expected behaviour when kafka is not set up. It distinguishes "kafka isn't configured" from a genuine broker or argument error. + +If `enabled:1b` and the library file is not found, `init` logs an error and continues rather than throwing - the process stays alive and all functions remain as stubs. This matches TorQ's original behaviour: a missing native library degrades the module to disabled rather than halting the process. + +> **Note:** If kafka silently isn't working after `init` with `enabled:1b`, check the injected logger's error output. `init` logs the missing library path but does not throw - there is nothing to catch. + +--- + +## TorQ migration + +| TorQ pattern | Module equivalent | +|---|---| +| `enabled:@[value;\`enabled;.z.o in \`l64]` | `kafka.init[\`enabled!enlist 1b;logdep]` | +| `kupd:{[k;x]...}` defined in settings | `kafka.init[\`kupd!enlist{[k;x]...};logdep]` | +| default `kupd` prints bytes to stdout (`-1 \`char$x`) | default `kupd` routes through injected logger - override with `kupd` config key if stdout behaviour is needed | +| `.kafka.initconsumer[s;o]` | `kafka.initconsumer[s;o]` | +| `.kafka.initproducer[s;o]` | `kafka.initproducer[s;o]` | +| `.kafka.cleanupconsumer[(::)]` | `kafka.cleanupconsumer[(::)]` | +| `.kafka.cleanupproducer[(::)]` | `kafka.cleanupproducer[(::)]` | +| `.kafka.subscribe[t;p]` | `kafka.subscribe[t;p]` | +| `.kafka.publish[t;p;k;m]` | `kafka.publish[t;p;k;m]` | + +--- + +## Manual verification (requires kafkaq.so and Kafka broker) + +```q +/ verify lib loads +kafka.init[`enabled`libpath!(1b;`$/path/to/l64/kafkaq);logdep] + +/ verify consumer and subscription +kafka.initconsumer[`localhost:9092;()!()] +kafka.subscribe[`test;0] + +/ verify publish +kafka.initproducer[`localhost:9092;()!()] +kafka.publish[`test;0;`;`byte$"hello from kdb+"] + +/ cleanup +kafka.cleanupconsumer[(::)] +kafka.cleanupproducer[(::)] +``` + +--- + +## Notes + +- `enabled` defaults to `1b` only on `l64` hosts; everywhere else it defaults to `0b`. Pass `enabled:1b` explicitly to force-enable on a platform where a native library build is available. +- The `log` dependency is required with no fallback. This differs from TorQ, which has no equivalent concept of injected logging - this module requires it for consistency with other `di.*` modules. +- The default `kupd` routes through the injected logger rather than TorQ's original raw stdout write (`-1 \`char$x`). Override with the `kupd` config key if stdout behaviour is needed. diff --git a/di/kafka/kafka.q b/di/kafka/kafka.q new file mode 100644 index 00000000..ffe5b9c1 --- /dev/null +++ b/di/kafka/kafka.q @@ -0,0 +1,90 @@ +/ kafka consumer/producer interface - wraps kafkaq native library + +/ default message handler - routes message through injected logger +defaultkupd:{[k;x].z.m.loginfo[`kafka;"kupd: ","c"$x]}; + +/ default lib path derived from KDBLIB env var and os string +/ override with libpath config key if KDBLIB is not set in the environment +defaultlib:`$getenv[`KDBLIB],"/",string[.z.o],"/kafkaq"; + +/ ============================================================ +/ module state and defaults +/ ============================================================ + +/ whether native library is loaded - overwritten by init; true by default on l64 +enabled:.z.o in `l64; + +/ path to kafkaq library without file extension - overwritten by init +lib:defaultlib; + +/ message handler called by C library on message receipt - overwritten by init +kupd:defaultkupd; + +/ ============================================================ +/ native function stubs - replaced by init when enabled:1b and library loads +/ ============================================================ + +/ initialise consumer with broker address and option dictionary +initconsumer:{[s;o]'"kafka not enabled"}; + +/ initialise producer with broker address and option dictionary +initproducer:{[s;o]'"kafka not enabled"}; + +/ disconnect and free consumer object and stop subscription thread - rank-1 matches C lib 2:(`cleanupconsumer;1) +cleanupconsumer:{[x]'"kafka not enabled"}; + +/ disconnect and free producer object - rank-1 matches C lib 2:(`cleanupproducer;1) +cleanupproducer:{[x]'"kafka not enabled"}; + +/ start subscription thread for topic on partition - messages delivered to kupd +subscribe:{[t;p]'"kafka not enabled"}; + +/ publish byte vector to topic and partition with given key +publish:{[t;p;k;m]'"kafka not enabled"}; + +/ ============================================================ +/ public api +/ ============================================================ + +setkupd:{[f] + / update message handler; propagate to global kupd for C callback if enabled + .z.m.kupd:f; + if[.z.m.enabled;@[`.;`kupd;:;f]]; + }; + +init:{[config;deps] + / config: dict with optional keys `enabled`libpath`kupd + / deps: `log!(logdict) + / `log: `info`warn`error!(infofunc;warnfunc;errfunc) - required + logdict:$[99h=type deps;$[(`log in key deps) and not (::)~deps`log;deps`log;()!()];()!()]; + if[not all `info`warn`error in key logdict; + '"di.kafka: log dependency is required; pass `info`warn`error functions - see di.log or refer to confluence documentation"; + ]; + .z.m.loginfo:logdict`info; + .z.m.logwarn:logdict`warn; + .z.m.logerr:logdict`error; + / normalise config - handles (::) and ()!() identically + cfg:$[99h=type config;config;()!()]; + .z.m.enabled:$[`enabled in key cfg;cfg`enabled;.z.o in `l64]; + .z.m.lib:$[`libpath in key cfg;cfg`libpath;defaultlib]; + .z.m.kupd:$[`kupd in key cfg;cfg`kupd;defaultkupd]; + if[.z.m.enabled; + libfile:hsym ` sv .z.m.lib,$[.z.o like "w*";`dll;`so]; + / protected key - kdbx throws on paths with non-existent ancestors + libexists:@[{not ()~key x};libfile;{0b}]; + if[not libexists; + .z.m.logerr[`kafka;"no such file ",1_string libfile] + ]; + if[libexists; + .z.m.initconsumer:.z.m.lib 2:(`initconsumer;2); + .z.m.initproducer:.z.m.lib 2:(`initproducer;2); + .z.m.cleanupconsumer:.z.m.lib 2:(`cleanupconsumer;1); + .z.m.cleanupproducer:.z.m.lib 2:(`cleanupproducer;1); + .z.m.subscribe:.z.m.lib 2:(`subscribe;2); + .z.m.publish:.z.m.lib 2:(`publish;4); + / set global kupd - unavoidable side effect of native library design + @[`.;`kupd;:;.z.m.kupd]; + .z.m.loginfo[`kafka;"kupd is set to ",-3!.z.m.kupd]; + ]; + ]; + }; diff --git a/di/kafka/test.csv b/di/kafka/test.csv new file mode 100644 index 00000000..305895c2 --- /dev/null +++ b/di/kafka/test.csv @@ -0,0 +1,49 @@ +action,ms,bytes,lang,code,repeat,minver,comment +/ pre-test setup +before,0,0,q,kafka:use`di.kafka,1,1,load di.kafka module +before,0,0,q,mocklog:`info`warn`error!({[c;m]};{[c;m]};{[c;m]}),1,1,no-op mock log dependency +before,0,0,q,logdep:enlist[`log]!enlist mocklog,1,1,log dep dict +before,0,0,q,kafkaterrored:0b,1,1,flag used to prove enabled:0b branch was skipped not merely unsuccessful +before,0,0,q,capturelog:`info`warn`error!({[c;m]};{[c;m]};{[c;m] `kafkaterrored set 1b}),1,1,capturing mock logger - error fn uses set builtin; only reliable when not called from within module execution context + +/ logdict extraction - invalid deps inputs throw correct message +true,0,0,q,.[kafka.init;(()!();()!());{x}] like "*log dependency*",1,1,empty dict deps throws correct message +true,0,0,q,.[kafka.init;(()!();(::));{x}] like "*log dependency*",1,1,:: as deps throws correct message +true,0,0,q,.[kafka.init;(()!();42);{x}] like "*log dependency*",1,1,non-dict deps throws correct message +true,0,0,q,.[kafka.init;(()!();enlist[`log]!enlist(::));{x}] like "*log dependency*",1,1,log value of :: throws correct message +true,0,0,q,.[kafka.init;(()!();enlist[`notlog]!enlist mocklog);{x}] like "*log dependency*",1,1,deps without log key throws correct message +true,0,0,q,.[kafka.init;(()!();enlist[`log]!enlist `info`warn!(mocklog`info;mocklog`warn));{x}] like "*log dependency*",1,1,partial log dict missing error key throws correct message + +/ init disabled - stubs throw with correct message +run,0,0,q,kafka.init[()!();logdep],1,1,init with empty config does not error +true,0,0,q,.[kafka.initconsumer;(`localhost:9092;()!());{x}] like "*kafka not enabled*",1,1,initconsumer stub throws correct message +true,0,0,q,.[kafka.initproducer;(`localhost:9092;()!());{x}] like "*kafka not enabled*",1,1,initproducer stub throws correct message +true,0,0,q,.[kafka.cleanupconsumer;enlist(::);{x}] like "*kafka not enabled*",1,1,cleanupconsumer stub throws correct message +true,0,0,q,.[kafka.cleanupproducer;enlist(::);{x}] like "*kafka not enabled*",1,1,cleanupproducer stub throws correct message +true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,subscribe stub throws correct message +true,0,0,q,.[kafka.publish;(`test;0;`;0x68656c6c6f);{x}] like "*kafka not enabled*",1,1,publish stub throws correct message + +/ init - explicit enabled:0b: capturelog proves lib-loading branch was skipped not merely unsuccessful +run,0,0,q,kafka.init[enlist[`enabled]!enlist 0b;enlist[`log]!enlist capturelog],1,1,init with enabled:0b does not error +true,0,0,q,not kafkaterrored,1,1,logerr was not called - lib-loading branch was genuinely skipped +true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,subscribe throws correct message after explicit disabled init + +/ init - :: config +run,0,0,q,kafka.init[::;logdep],1,1,init with :: config does not error + +/ init - enabled:1b with missing lib: process continues with stubs intact +/ note: logerr is called internally but cannot be captured from within kdbx module execution context +run,0,0,q,kafka.init[`enabled`libpath!(1b;`fakekafkaq);logdep],1,1,init with enabled:1b and missing lib does not throw +true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,stubs remain after failed lib load + +/ setkupd - state unobservable without live broker; test confirms no throw only +run,0,0,q,kafka.init[()!();logdep],1,1,reinit with no-op log before setkupd test +run,0,0,q,kafka.setkupd[{[k;x]}],1,1,setkupd does not error when disabled - internal state unobservable without live broker + +/ init - custom kupd +run,0,0,q,kafka.init[(enlist`kupd)!enlist{[k;x]};logdep],1,1,init with custom kupd does not error + +/ multiple consecutive init calls do not corrupt state +run,0,0,q,kafka.init[()!();logdep],1,1,second consecutive init does not error +run,0,0,q,kafka.init[enlist[`enabled]!enlist 0b;logdep],1,1,third consecutive init with enabled:0b does not error +true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,stubs remain correct after multiple init calls