Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions di/kafka/init.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/ kafka consumer/producer interface - wraps kafkaq native library

\l ::kafka.q

export:([init;initconsumer;initproducer;cleanupconsumer;cleanupproducer;subscribe;publish;setkupd])
203 changes: 203 additions & 0 deletions di/kafka/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# di.kafka

Kafka consumer and producer interface for kdb+ processes. Wraps the `kafkaq` native C shared library to provide consumer initialisation, producer initialisation, topic subscription, and message publishing. Incoming messages are delivered to a configurable `kupd` callback in the root namespace.

---

## Features

- Connect kdb+ processes to a Kafka broker as consumer, producer, or both
- Subscribe to topics and receive messages via a configurable callback function
- Publish byte vectors to topics with optional message keys
- Graceful disabled mode on unsupported platforms - all native functions are replaced with informative stubs
- Message handler (`kupd`) configurable at init or updated live via `setkupd`
- Compatible with any logging implementation via the injected `log` dependency

---

## Dependencies

**Native library:** `kafkaq.so` (Linux) or `kafkaq.dll` (Windows). Must be present on the host. Path is provided via the `libpath` config key or derived from the `KDBLIB` environment variable.

**Injected:** `log` - required. Pass `info`, `warn`, and `error` log functions via the `deps` argument to `init`. See `di.log` or the Confluence documentation for the expected function signatures.

---

## Initialisation

```q
log:use`di.log
log.init[logconfig]
logdep:enlist[`log]!enlist `info`warn`error!(log.info;log.warn;log.error)

kafka:use`di.kafka
kafka.init[`enabled`libpath!(1b;`$/opt/TorQ/lib/l64/kafkaq);logdep]
```

All config keys are optional. Passing `(::)` or `()!()` as config applies defaults.

| Key | Type | Default | Description |
|---|---|---|---|
| `enabled` | boolean | `1b` on `l64`, `0b` on all other platforms | Whether to load the native library. On a non-`l64` host the module loads in disabled mode unless explicitly overridden. |
| `libpath` | symbol | derived from `KDBLIB` env var | Path to the kafkaq library **without** file extension. If `KDBLIB` is not set, provide this explicitly. |
| `kupd` | function | logs message via injected logger | Message handler called by the C library on Kafka message receipt. Signature: `{[key;bytes]}`. |

The `log` dependency is **required**. `init` throws if it is absent, `(::)`, or missing any of `info`/`warn`/`error`.

`init` must be called before any other function is used. It is safe to call multiple times.

---

## Functions

### `init`
```q
kafka.init[config;deps]
```
Initialises the module. Loads the native library when `enabled:1b` and the library is found. Sets the global `kupd` callback for the C library.

### `initconsumer`
```q
kafka.initconsumer[`localhost:9092;`fetch.wait.max.ms`fetch.error.backoff.ms!`5`5]
```
Initialises a Kafka consumer and connects to the broker.

### `initproducer`
```q
kafka.initproducer[`localhost:9092;`queue.buffering.max.ms`batch.num.messages!`5`1]
```
Initialises a Kafka producer and connects to the broker.

### `cleanupconsumer`
```q
kafka.cleanupconsumer[(::)]
```
Disconnects and frees the consumer object, stopping the subscription thread.

### `cleanupproducer`
```q
kafka.cleanupproducer[(::)]
```
Disconnects and frees the producer object.

### `subscribe`
```q
kafka.subscribe[`mytopic;0]
```
Starts the subscription thread for a topic on a given partition. Messages are delivered to `kupd`.

### `publish`
```q
kafka.publish[`mytopic;0;`;`byte$"hello world"]
kafka.publish[`mytopic;0;`mykey;`byte$"hello world"]
```
Publishes a byte vector to a topic and partition with an optional symbol key.

### `setkupd`
```q
kafka.setkupd[{[k;x] upd[`kafkadata;(enlist k;enlist x)]}]
```
Updates the message handler after `init` without requiring a full reinitialisation. Also updates the global `kupd` in the root namespace if the module is enabled.

`init` must be called before `setkupd`.

> **Warning:** Avoid calling `setkupd` while a subscription is active. The Kafka C library delivers messages on a background thread and may invoke the old handler after the swap.

---

## Usage example

```q
log:use`di.log
log.init[logconfig]
logdep:enlist[`log]!enlist `info`warn`error!(log.info;log.warn;log.error)

myhandler:{[k;x]
upd[`kafkadata;(enlist .z.p;enlist k;enlist "c"$x)];
};

kafka:use`di.kafka
kafka.init[`enabled`libpath`kupd!(1b;`$/opt/TorQ/lib/l64/kafkaq;myhandler);logdep]

kafka.initconsumer[`localhost:9092;()!()]
kafka.subscribe[`trades;0]

kafka.initproducer[`localhost:9092;()!()]
kafka.publish[`trades;0;`;`byte$"test message"]

kafka.cleanupconsumer[(::)]
kafka.cleanupproducer[(::)]
```

---

## Global side effect

When `enabled:1b` and the library loads successfully, `init` sets `kupd` in the **root namespace**:

```q
@[`.;`kupd;:;.z.m.kupd]
```

This is an unavoidable consequence of the native C library's design - kafkaq is compiled to call `kupd` by name in the root namespace. `setkupd` also updates the global when enabled.

---

## Disabled mode

When `enabled:0b` - either because the platform default resolved to `0b` or it was explicitly set - every native function (`initconsumer`, `initproducer`, `cleanupconsumer`, `cleanupproducer`, `subscribe`, `publish`) is a stub that throws:

```
'kafka not enabled
```

This is the expected behaviour when kafka is not set up. It distinguishes "kafka isn't configured" from a genuine broker or argument error.

If `enabled:1b` and the library file is not found, `init` logs an error and continues rather than throwing - the process stays alive and all functions remain as stubs. This matches TorQ's original behaviour: a missing native library degrades the module to disabled rather than halting the process.

> **Note:** If kafka silently isn't working after `init` with `enabled:1b`, check the injected logger's error output. `init` logs the missing library path but does not throw - there is nothing to catch.

---

## TorQ migration

| TorQ pattern | Module equivalent |
|---|---|
| `enabled:@[value;\`enabled;.z.o in \`l64]` | `kafka.init[\`enabled!enlist 1b;logdep]` |
| `kupd:{[k;x]...}` defined in settings | `kafka.init[\`kupd!enlist{[k;x]...};logdep]` |
| default `kupd` prints bytes to stdout (`-1 \`char$x`) | default `kupd` routes through injected logger - override with `kupd` config key if stdout behaviour is needed |
| `.kafka.initconsumer[s;o]` | `kafka.initconsumer[s;o]` |
| `.kafka.initproducer[s;o]` | `kafka.initproducer[s;o]` |
| `.kafka.cleanupconsumer[(::)]` | `kafka.cleanupconsumer[(::)]` |
| `.kafka.cleanupproducer[(::)]` | `kafka.cleanupproducer[(::)]` |
| `.kafka.subscribe[t;p]` | `kafka.subscribe[t;p]` |
| `.kafka.publish[t;p;k;m]` | `kafka.publish[t;p;k;m]` |

---

## Manual verification (requires kafkaq.so and Kafka broker)

```q
/ verify lib loads
kafka.init[`enabled`libpath!(1b;`$/path/to/l64/kafkaq);logdep]

/ verify consumer and subscription
kafka.initconsumer[`localhost:9092;()!()]
kafka.subscribe[`test;0]

/ verify publish
kafka.initproducer[`localhost:9092;()!()]
kafka.publish[`test;0;`;`byte$"hello from kdb+"]

/ cleanup
kafka.cleanupconsumer[(::)]
kafka.cleanupproducer[(::)]
```

---

## Notes

- `enabled` defaults to `1b` only on `l64` hosts; everywhere else it defaults to `0b`. Pass `enabled:1b` explicitly to force-enable on a platform where a native library build is available.
- The `log` dependency is required with no fallback. This differs from TorQ, which has no equivalent concept of injected logging - this module requires it for consistency with other `di.*` modules.
- The default `kupd` routes through the injected logger rather than TorQ's original raw stdout write (`-1 \`char$x`). Override with the `kupd` config key if stdout behaviour is needed.
90 changes: 90 additions & 0 deletions di/kafka/kafka.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/ kafka consumer/producer interface - wraps kafkaq native library

/ default message handler - routes message through injected logger
defaultkupd:{[k;x].z.m.loginfo[`kafka;"kupd: ","c"$x]};

/ default lib path derived from KDBLIB env var and os string
/ override with libpath config key if KDBLIB is not set in the environment
defaultlib:`$getenv[`KDBLIB],"/",string[.z.o],"/kafkaq";

/ ============================================================
/ module state and defaults
/ ============================================================

/ whether native library is loaded - overwritten by init; true by default on l64
enabled:.z.o in `l64;

/ path to kafkaq library without file extension - overwritten by init
lib:defaultlib;

/ message handler called by C library on message receipt - overwritten by init
kupd:defaultkupd;

/ ============================================================
/ native function stubs - replaced by init when enabled:1b and library loads
/ ============================================================

/ initialise consumer with broker address and option dictionary
initconsumer:{[s;o]'"kafka not enabled"};

/ initialise producer with broker address and option dictionary
initproducer:{[s;o]'"kafka not enabled"};

/ disconnect and free consumer object and stop subscription thread - rank-1 matches C lib 2:(`cleanupconsumer;1)
cleanupconsumer:{[x]'"kafka not enabled"};

/ disconnect and free producer object - rank-1 matches C lib 2:(`cleanupproducer;1)
cleanupproducer:{[x]'"kafka not enabled"};

/ start subscription thread for topic on partition - messages delivered to kupd
subscribe:{[t;p]'"kafka not enabled"};

/ publish byte vector to topic and partition with given key
publish:{[t;p;k;m]'"kafka not enabled"};

/ ============================================================
/ public api
/ ============================================================

setkupd:{[f]
/ update message handler; propagate to global kupd for C callback if enabled
.z.m.kupd:f;
if[.z.m.enabled;@[`.;`kupd;:;f]];
};

init:{[config;deps]
/ config: dict with optional keys `enabled`libpath`kupd
/ deps: `log!(logdict)
/ `log: `info`warn`error!(infofunc;warnfunc;errfunc) - required
logdict:$[99h=type deps;$[(`log in key deps) and not (::)~deps`log;deps`log;()!()];()!()];
if[not all `info`warn`error in key logdict;
'"di.kafka: log dependency is required; pass `info`warn`error functions - see di.log or refer to confluence documentation";
];
.z.m.loginfo:logdict`info;
.z.m.logwarn:logdict`warn;
.z.m.logerr:logdict`error;
/ normalise config - handles (::) and ()!() identically
cfg:$[99h=type config;config;()!()];
.z.m.enabled:$[`enabled in key cfg;cfg`enabled;.z.o in `l64];
.z.m.lib:$[`libpath in key cfg;cfg`libpath;defaultlib];
.z.m.kupd:$[`kupd in key cfg;cfg`kupd;defaultkupd];
if[.z.m.enabled;
libfile:hsym ` sv .z.m.lib,$[.z.o like "w*";`dll;`so];
/ protected key - kdbx throws on paths with non-existent ancestors
libexists:@[{not ()~key x};libfile;{0b}];
if[not libexists;
.z.m.logerr[`kafka;"no such file ",1_string libfile]
];
if[libexists;
.z.m.initconsumer:.z.m.lib 2:(`initconsumer;2);
.z.m.initproducer:.z.m.lib 2:(`initproducer;2);
.z.m.cleanupconsumer:.z.m.lib 2:(`cleanupconsumer;1);
.z.m.cleanupproducer:.z.m.lib 2:(`cleanupproducer;1);
.z.m.subscribe:.z.m.lib 2:(`subscribe;2);
.z.m.publish:.z.m.lib 2:(`publish;4);
/ set global kupd - unavoidable side effect of native library design
@[`.;`kupd;:;.z.m.kupd];
.z.m.loginfo[`kafka;"kupd is set to ",-3!.z.m.kupd];
];
];
};
49 changes: 49 additions & 0 deletions di/kafka/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
action,ms,bytes,lang,code,repeat,minver,comment
/ pre-test setup
before,0,0,q,kafka:use`di.kafka,1,1,load di.kafka module
before,0,0,q,mocklog:`info`warn`error!({[c;m]};{[c;m]};{[c;m]}),1,1,no-op mock log dependency
before,0,0,q,logdep:enlist[`log]!enlist mocklog,1,1,log dep dict
before,0,0,q,kafkaterrored:0b,1,1,flag used to prove enabled:0b branch was skipped not merely unsuccessful
before,0,0,q,capturelog:`info`warn`error!({[c;m]};{[c;m]};{[c;m] `kafkaterrored set 1b}),1,1,capturing mock logger - error fn uses set builtin; only reliable when not called from within module execution context

/ logdict extraction - invalid deps inputs throw correct message
true,0,0,q,.[kafka.init;(()!();()!());{x}] like "*log dependency*",1,1,empty dict deps throws correct message
true,0,0,q,.[kafka.init;(()!();(::));{x}] like "*log dependency*",1,1,:: as deps throws correct message
true,0,0,q,.[kafka.init;(()!();42);{x}] like "*log dependency*",1,1,non-dict deps throws correct message
true,0,0,q,.[kafka.init;(()!();enlist[`log]!enlist(::));{x}] like "*log dependency*",1,1,log value of :: throws correct message
true,0,0,q,.[kafka.init;(()!();enlist[`notlog]!enlist mocklog);{x}] like "*log dependency*",1,1,deps without log key throws correct message
true,0,0,q,.[kafka.init;(()!();enlist[`log]!enlist `info`warn!(mocklog`info;mocklog`warn));{x}] like "*log dependency*",1,1,partial log dict missing error key throws correct message

/ init disabled - stubs throw with correct message
run,0,0,q,kafka.init[()!();logdep],1,1,init with empty config does not error
true,0,0,q,.[kafka.initconsumer;(`localhost:9092;()!());{x}] like "*kafka not enabled*",1,1,initconsumer stub throws correct message
true,0,0,q,.[kafka.initproducer;(`localhost:9092;()!());{x}] like "*kafka not enabled*",1,1,initproducer stub throws correct message
true,0,0,q,.[kafka.cleanupconsumer;enlist(::);{x}] like "*kafka not enabled*",1,1,cleanupconsumer stub throws correct message
true,0,0,q,.[kafka.cleanupproducer;enlist(::);{x}] like "*kafka not enabled*",1,1,cleanupproducer stub throws correct message
true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,subscribe stub throws correct message
true,0,0,q,.[kafka.publish;(`test;0;`;0x68656c6c6f);{x}] like "*kafka not enabled*",1,1,publish stub throws correct message

/ init - explicit enabled:0b: capturelog proves lib-loading branch was skipped not merely unsuccessful
run,0,0,q,kafka.init[enlist[`enabled]!enlist 0b;enlist[`log]!enlist capturelog],1,1,init with enabled:0b does not error
true,0,0,q,not kafkaterrored,1,1,logerr was not called - lib-loading branch was genuinely skipped
true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,subscribe throws correct message after explicit disabled init

/ init - :: config
run,0,0,q,kafka.init[::;logdep],1,1,init with :: config does not error

/ init - enabled:1b with missing lib: process continues with stubs intact
/ note: logerr is called internally but cannot be captured from within kdbx module execution context
run,0,0,q,kafka.init[`enabled`libpath!(1b;`fakekafkaq);logdep],1,1,init with enabled:1b and missing lib does not throw
true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,stubs remain after failed lib load

/ setkupd - state unobservable without live broker; test confirms no throw only
run,0,0,q,kafka.init[()!();logdep],1,1,reinit with no-op log before setkupd test
run,0,0,q,kafka.setkupd[{[k;x]}],1,1,setkupd does not error when disabled - internal state unobservable without live broker

/ init - custom kupd
run,0,0,q,kafka.init[(enlist`kupd)!enlist{[k;x]};logdep],1,1,init with custom kupd does not error

/ multiple consecutive init calls do not corrupt state
run,0,0,q,kafka.init[()!();logdep],1,1,second consecutive init does not error
run,0,0,q,kafka.init[enlist[`enabled]!enlist 0b;logdep],1,1,third consecutive init with enabled:0b does not error
true,0,0,q,.[kafka.subscribe;(`test;0);{x}] like "*kafka not enabled*",1,1,stubs remain correct after multiple init calls