Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 266 additions & 0 deletions di/dbwrite/dbwrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
# di.dbwrite

Write, sort, and attribute utilities for kdb+ processes that persist data to disk.

---

## Features

- Write in-memory tables to a date-partitioned HDB with `savedown` — enumerates syms, applies `p#` to `sym`, writes, then sorts
- Append rows to an existing partition with `appenddown` — enumerates syms and appends; sort separately when the partition is complete
- Sort on-disk table partitions by configured columns using `xasc`
- Apply kdb+ attributes (`p`, `s`, `g`, `u`) to on-disk columns after sort
- Sort and attribute behaviour driven by a CSV config file; a `default` row acts as a fallback
- A built-in `default` row in `params` provides an out-of-the-box fallback (sort by `time` ascending) when no config file is loaded
- Run `.Q.gc[]` with before/after memory logging
- All errors from sort, attribute application, and write operations are either caught-and-logged (sort, applyattr) or propagated to the caller (savedown, upsert)

---

## Dependencies

| Dependency | Key | Required | Description |
|---|---|---|---|
| `di.log` | `` `log `` | yes | Logging functions `info`, `warn`, `error` — each `{[ctx;msg] ...}` |

The `log` dependency must be passed to `init`. The module throws if it is absent or `(::)`.

---

## Sort config CSV

`loadconfig` reads a CSV with four columns:

| Column | Type | Description |
|---|---|---|
| `tabname` | symbol | Table name, or `` `default `` as a catch-all fallback |
| `att` | symbol | kdb+ attribute to apply: `p`, `s`, `g`, `u`, or empty for none |
| `column` | symbol | Column to sort or attribute |
| `sort` | boolean | `1b` — include in `xasc` sort key; `0b` — attribute only |

Example `sort.csv`:

```
tabname,att,column,sort
trade,p,sym,1
trade,,price,0
quote,p,sym,1
default,,time,1
```

Sorts `trade` by `sym`, applies `p` to `sym`. Tables not listed fall back to `default` and sort by `time`.

---

## Functions

### Summary

| Function | Description |
|---|---|
| `init[config;deps]` | Wire injected dependencies; must be called first |
| `savedown[dir;part;tabname;data]` | Write in-memory table to HDB partition, enumerate syms, apply `p#sym`, then sort |
| `appenddown[dir;part;tabname;data]` | Append rows to existing partition and enumerate syms; does not sort |
| `loadconfig[file]` | Load and validate the sort config CSV into module state |
| `sort[d]` | Sort an on-disk partition and apply attributes per config |
| `applyattr[dloc;colname;att]` | Apply a single kdb+ attribute to an on-disk column |
| `gc[]` | Run `.Q.gc[]` and log before/after memory stats |

---

### `init[config;deps]`

Wires injected dependencies into the module. Must be called before any other function.

**Parameters**

| Parameter | Type | Description |
|---|---|---|
| `config` | any | Accepted but unused; pass `(::)` |
| `deps` | dict | Must contain `` `log `` → `` `info`warn`error!(infofunc;warnfunc;errfunc) `` |

**Returns** — generic null.

Throws with a descriptive message if the `log` dependency is missing or set to `(::)`.

```q
log:use`di.log
log.init[logconfig]
logdep:`info`warn`error!(log.info;log.warn;log.error)

dbwrite:use`di.dbwrite
dbwrite.init[(::);(enlist`log)!enlist logdep]
```

---

### `savedown[dir;part;tabname;data]`

Writes an in-memory table to a date-partitioned HDB. Enumerates symbol columns against the HDB sym file, applies `p#` to `sym` if present, writes the partition, then calls `sort` to sort and apply attributes per the loaded config.

**Parameters**

| Parameter | Type | Description |
|---|---|---|
| `dir` | hsym | HDB root directory (e.g. `` `:hdb ``) |
| `part` | date/month/int | Partition value |
| `tabname` | symbol | Table name — determines the partition subdirectory |
| `data` | table | In-memory table to write |

**Returns** — generic null on success; throws on write failure.

If `loadconfig` has not been called, the built-in default row applies (sort by `time` ascending). If the table has no `sym` column, enumeration and `p#` are skipped.

```q
dbwrite.savedown[`:hdb;2024.01.02;`trade;data]
```

---

### `appenddown[dir;part;tabname;data]`

Appends rows to an existing on-disk partition. Enumerates symbol columns then appends; does not sort. Call `sort` explicitly when the partition is complete.

Keeping sort separate allows multiple intraday appends without the cost of re-sorting a growing partition on each call.

**Parameters**

| Parameter | Type | Description |
|---|---|---|
| `dir` | hsym | HDB root directory |
| `part` | date/month/int | Partition value |
| `tabname` | symbol | Table name |
| `data` | table | Rows to append |

**Returns** — generic null on success; throws if the partition does not exist or on write failure.

```q
/ intraday: append each batch as it arrives
dbwrite.appenddown[`:hdb;2024.01.02;`trade;batch]

/ end-of-day: sort once when done
dbwrite.sort[(`trade;.Q.par[`:hdb;2024.01.02;`trade])]
```

---

### `loadconfig[file]`

Loads and validates the sort configuration CSV, storing the result in module state for use by `sort`.

**Parameters**

| Parameter | Type | Description |
|---|---|---|
| `file` | hsym | Path to the sort config CSV; pass null (`` ` ``) to warn and reset `params` to the default row |

**Returns** — generic null on success; throws on file/validation failure.

Validation checks that all four required columns (`tabname`, `att`, `column`, `sort`) are present and that all `att` values are within `` ``p`s`g`u ``. Throws a descriptive error for invalid files or unreadable paths.

Passing null warns at `warn` level and resets `params` to the default row — it does not throw.

```q
dbwrite.loadconfig[`:config/sort.csv]
```

---

### `sort[d]`

Sorts an on-disk table partition and applies configured attributes.

**Parameters**

| Parameter | Type | Description |
|---|---|---|
| `d` | symbol or list | Table name alone, or `(tabname;dir)`, or `(tabname;list of dirs)` — see below |

`d` forms:

| Form | Example |
|---|---|
| Symbol | `` `trade `` |
| Tabname + single dir | `` (`trade;`:hdb/2024.01.02/trade/) `` |
| Tabname + dir list | `` (`trade;`:hdb/2024.01.02/trade/ `:hdb/2024.01.03/trade/) `` |

**Returns** — generic null on success; `()` if no sort config is found for the table.

Config lookup order within the loaded params:
1. Rows where `tabname` matches — used directly.
2. Rows where `tabname = \`default` — used with a `warn` log.
3. No match — warns and returns `()`.

Sort and attribute errors are caught, logged, and swallowed.

```q
dbwrite.sort[(`trade;`:hdb/2024.01.02/trade/)]
```

---

### `applyattr[dloc;colname;att]`

Applies a single kdb+ attribute to an on-disk column.

**Parameters**

| Parameter | Type | Description |
|---|---|---|
| `dloc` | hsym | On-disk partition directory (e.g. `` `:hdb/2024.01.02/trade/ ``) |
| `colname` | symbol | Column name |
| `att` | symbol | Attribute to apply: `` `p ``, `` `s ``, `` `g ``, or `` `u `` |

**Returns** — generic null on success.

Logs the attempt before applying. On failure, logs the error and continues — does not throw.

```q
dbwrite.applyattr[`:hdb/2024.01.02/trade/;`sym;`p]
```

---

### `gc[]`

Runs `.Q.gc[]` and logs before/after memory statistics.

**Returns** — generic null.

Emits two `info`-level log lines: memory stats before collection, and bytes recovered plus memory stats after.

```q
dbwrite.gc[]
```

---

## Running tests

```q
k4unit:use`di.k4unit
k4unit.moduletest`di.dbwrite
```

The test suite uses mock logging (no `di.log` dependency required). The mock wires up three no-op counters so log call counts can be asserted:

```q
dbwrite:use`di.dbwrite
logcount:0
loginfo:{[c;m] logcount::logcount+1}
logwarn:{[c;m] logcount::logcount+1}
logerr:{[c;m] logcount::logcount+1}
logdep:`info`warn`error!(loginfo;logwarn;logerr)
deps:(enlist`log)!enlist logdep
dbwrite.init[(::);deps]
```

Tests cover: dependency injection, `init` error on missing log dep, `savedown` write and sort, `savedown` without sym column, `appenddown` append without sort, explicit `sort` after `appenddown`, `appenddown` error on non-existent partition, `sort` with default row fallback / explicit config / `default` row fallback / no-match skip / empty input / wrong type, `loadconfig` with null file / valid file / unrecognised columns / unrecognised attributes / missing file / header-only file, `applyattr` on missing path / null column / invalid attribute / valid path, `gc` log count.

---

## Exported symbols

```q
export:([init;savedown;appenddown;sort;applyattr;loadconfig;gc])
```
114 changes: 114 additions & 0 deletions di/dbwrite/dbwrite.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/ sort params table - default row sorts all tables by time ascending
params:([] tabname:enlist`default; att:enlist`; column:enlist`time; sort:enlist 1b);


/ load and validate sort.csv into .z.M.params
/ file: hsym path; null warns and resets params to default row
loadconfig:{[file]
if[not -11h=type file;
'"loadconfig: file must be a symbol, got type ",(string type file)];
if[null file;
.z.m.logwarn[`dbwrite;"loadconfig called with no file; resetting params to default"];
@[.z.M;`params;:;([] tabname:enlist`default; att:enlist`; column:enlist`time; sort:enlist 1b)]];
if[not null file;
file:hsym file;
p:@[
{.z.m.loginfo[`dbwrite;"retrieving sort settings from ",string x];("SSSB";enlist",")0:x};
file;
{[f;e]'"failed to open ",string[f],": ",e}[file]
];
if[not all spcb:(spc:cols p) in `tabname`att`column`sort;
'"unrecognised columns (",(", " sv string spc where not spcb),") in ",string file];
if[not all atb:(at:distinct p`att) in ``p`s`g`u;
'"unrecognised attribute(s): ",", " sv string at where not atb];
@[.z.M;`params;:;p]];
};

/ apply a single kdb+ attribute to an on-disk column; logs and swallows errors
applyattr:{[dloc;colname;att]
.z.m.loginfo[`dbwrite;"applying ",string[att]," attr to ",string[colname]," in ",string dloc];
if[null colname;
.z.m.logerr[`dbwrite;"applyattr called with null column name in ",string dloc]];
if[not null colname;
$[not att in `p`s`g`u;
.z.m.logerr[`dbwrite;"applyattr: invalid attribute ",string[att]," for ",string[colname]," in ",string dloc];
.[{@[x;y;z#]};(dloc;colname;att);
{[dloc;colname;att;e]
.z.m.logerr[`dbwrite;"unable to apply ",string[att]," attr to ",string[colname]," in ",string[dloc],": ",e]
}[dloc;colname;att]
]]];
};

/ sort an on-disk table partition and apply attributes per sort.csv config
/ d: tabname | (tabname;dir) | (tabname;list of dirs)
sort:{[d]
$[not count d;
();
not (type d) in -11 0 11h;
[.z.m.logerr[`dbwrite;"sort: d must be a symbol or list, got type ",(string type d)];()];
[
.z.m.loginfo[`dbwrite;"sorting ",(st:string t:first d)," table"];
sp:$[count tabsp:select from .z.M.params where tabname=t;
[.z.m.loginfo[`dbwrite;"sort params found for: ",st];tabsp];
count defsp:select from .z.M.params where tabname=`default;
[.z.m.logwarn[`dbwrite;"no sort params for: ",st,"; using defaults"];defsp];
[.z.m.logwarn[`dbwrite;"no sort params for: ",st,"; skipping sort"];:()]];
{[sp;dloc]
if[count sortcols:exec column from sp where sort,not null column;
.z.m.loginfo[`dbwrite;"sorting ",string[dloc]," by: ",", " sv string sortcols];
.[xasc;(sortcols;dloc);
{[sc;dl;e]
.z.m.logerr[`dbwrite;"failed to sort ",string[dl]," by ",(", " sv string sc),": ",e]
}[sortcols;dloc]]];
if[count attrcols:select column,att from sp where not null att;
.z.M.applyattr[dloc;;]'[attrcols`column;attrcols`att]];
}[sp] each distinct (),last d;
.z.m.loginfo[`dbwrite;"finished sorting ",st," table"]
]]
};


/ write table to a date-partitioned hdb: enumerate syms, apply p# to sym if present, write, then sort
/ dir: hdb root (hsym); part: partition value (date/month/int); tabname: symbol; data: in-memory table
savedown:{[dir;part;tabname;data]
.z.m.loginfo[`dbwrite;"saving ",string[tabname]," partition ",string[part]," to ",string dir];
path:` sv (.Q.par[dir;part;tabname];`);
data:.Q.en[dir;data];
path set $[`sym in cols data;@[data;`sym;{`p#x}];data];
sort[(tabname;path)];
.z.m.loginfo[`dbwrite;"finished saving ",string tabname];
};

/ append data to an existing on-disk partition; enumerate syms but do not sort
/ call sort separately when the partition is complete
/ dir: hdb root (hsym); part: partition value; tabname: symbol; data: in-memory table
appenddown:{[dir;part;tabname;data]
.z.m.loginfo[`dbwrite;"appending ",string[tabname]," partition ",string[part]," in ",string dir];
path:` sv (.Q.par[dir;part;tabname];`);
if[not count @[key;path;{`$()}];
'"appenddown: partition does not exist at ",string path];
.[path;();,;.Q.en[dir;data]];
.z.m.loginfo[`dbwrite;"finished appending ",string tabname];
};

/ format current process memory stats as a loggable string
memstats:{[]"mem stats: ",{"; "sv "=" sv'flip(string key x;(string value x),\:" MB")}`long$.Q.w[]%1048576};

/ run .Q.gc[] and log before/after memory stats
gc:{
.z.m.loginfo[`dbwrite;"starting garbage collect. ",.z.M.memstats[]];
r:.Q.gc[];
.z.m.loginfo[`dbwrite;"garbage collection returned ",(string `long$r%1048576),"MB. ",.z.M.memstats[]]
};

init:{[config;deps]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config argument not actually called within this function?

/ config: unused, pass (::)
/ deps: `log!(logdict) - `info`warn`error!(infofunc;warnfunc;errfunc) - required
logdict:$[99h=type deps;$[(`log in key deps) and not (::)~deps`log;deps`log;()!()];()!()];
if[not count logdict;
'"di.dbwrite: log dependency is required; pass `info`warn`error functions - see di.log or refer to confluence documentation";
];
.z.m.loginfo:logdict`info;
.z.m.logwarn:logdict`warn;
.z.m.logerr:logdict`error;
};
6 changes: 6 additions & 0 deletions di/dbwrite/init.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/ dbwrite module - sort, attribute application, save-down manipulation, and GC utilities
/ used by processes that persist data to disk (rdb, wdb, tickerlogreplay)

\l ::dbwrite.q

export:([init;savedown;appenddown;sort;applyattr;loadconfig;gc])
Loading