-
Notifications
You must be signed in to change notification settings - Fork 7
Feature/dbwritemodule #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Ruairi-wq2
wants to merge
7
commits into
main
Choose a base branch
from
feature/dbwritemodule
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+536
−0
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
d1b7a7e
feature/dbwritemodule-Initial-commit
Ruairi-wq2 dac127a
Update dbwrite.md and dbwrite.q
Ruairi-wq2 b8b3914
Export savedownmanipulation, simplify sort fallback, add loadconfig n…
Ruairi-wq2 4c0099e
Add null/type guards, comprehensive edge-case tests, fix sort empty-i…
Ruairi-wq2 c2ac971
Update dbwrite.md: fix stale defaultparams references, add mock-loggi…
Ruairi-wq2 931db34
Add savedown and upsert, remove TorQ-specific features
Ruairi-wq2 7b0a787
Fix upsert naming, path construction, and add appenddown existence guard
Ruairi-wq2 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,266 @@ | ||
| # di.dbwrite | ||
|
|
||
| Write, sort, and attribute utilities for kdb+ processes that persist data to disk. | ||
|
|
||
| --- | ||
|
|
||
| ## Features | ||
|
|
||
| - Write in-memory tables to a date-partitioned HDB with `savedown` — enumerates syms, applies `p#` to `sym`, writes, then sorts | ||
| - Append rows to an existing partition with `appenddown` — enumerates syms and appends; sort separately when the partition is complete | ||
| - Sort on-disk table partitions by configured columns using `xasc` | ||
| - Apply kdb+ attributes (`p`, `s`, `g`, `u`) to on-disk columns after sort | ||
| - Sort and attribute behaviour driven by a CSV config file; a `default` row acts as a fallback | ||
| - A built-in `default` row in `params` provides an out-of-the-box fallback (sort by `time` ascending) when no config file is loaded | ||
| - Run `.Q.gc[]` with before/after memory logging | ||
| - All errors from sort, attribute application, and write operations are either caught-and-logged (sort, applyattr) or propagated to the caller (savedown, upsert) | ||
|
|
||
| --- | ||
|
|
||
| ## Dependencies | ||
|
|
||
| | Dependency | Key | Required | Description | | ||
| |---|---|---|---| | ||
| | `di.log` | `` `log `` | yes | Logging functions `info`, `warn`, `error` — each `{[ctx;msg] ...}` | | ||
|
|
||
| The `log` dependency must be passed to `init`. The module throws if it is absent or `(::)`. | ||
|
|
||
| --- | ||
|
|
||
| ## Sort config CSV | ||
|
|
||
| `loadconfig` reads a CSV with four columns: | ||
|
|
||
| | Column | Type | Description | | ||
| |---|---|---| | ||
| | `tabname` | symbol | Table name, or `` `default `` as a catch-all fallback | | ||
| | `att` | symbol | kdb+ attribute to apply: `p`, `s`, `g`, `u`, or empty for none | | ||
| | `column` | symbol | Column to sort or attribute | | ||
| | `sort` | boolean | `1b` — include in `xasc` sort key; `0b` — attribute only | | ||
|
|
||
| Example `sort.csv`: | ||
|
|
||
| ``` | ||
| tabname,att,column,sort | ||
| trade,p,sym,1 | ||
| trade,,price,0 | ||
| quote,p,sym,1 | ||
| default,,time,1 | ||
| ``` | ||
|
|
||
| Sorts `trade` by `sym`, applies `p` to `sym`. Tables not listed fall back to `default` and sort by `time`. | ||
|
|
||
| --- | ||
|
|
||
| ## Functions | ||
|
|
||
| ### Summary | ||
|
|
||
| | Function | Description | | ||
| |---|---| | ||
| | `init[config;deps]` | Wire injected dependencies; must be called first | | ||
| | `savedown[dir;part;tabname;data]` | Write in-memory table to HDB partition, enumerate syms, apply `p#sym`, then sort | | ||
| | `appenddown[dir;part;tabname;data]` | Append rows to existing partition and enumerate syms; does not sort | | ||
| | `loadconfig[file]` | Load and validate the sort config CSV into module state | | ||
| | `sort[d]` | Sort an on-disk partition and apply attributes per config | | ||
| | `applyattr[dloc;colname;att]` | Apply a single kdb+ attribute to an on-disk column | | ||
| | `gc[]` | Run `.Q.gc[]` and log before/after memory stats | | ||
|
|
||
| --- | ||
|
|
||
| ### `init[config;deps]` | ||
|
|
||
| Wires injected dependencies into the module. Must be called before any other function. | ||
|
|
||
| **Parameters** | ||
|
|
||
| | Parameter | Type | Description | | ||
| |---|---|---| | ||
| | `config` | any | Accepted but unused; pass `(::)` | | ||
| | `deps` | dict | Must contain `` `log `` → `` `info`warn`error!(infofunc;warnfunc;errfunc) `` | | ||
|
|
||
| **Returns** — generic null. | ||
|
|
||
| Throws with a descriptive message if the `log` dependency is missing or set to `(::)`. | ||
|
|
||
| ```q | ||
| log:use`di.log | ||
| log.init[logconfig] | ||
| logdep:`info`warn`error!(log.info;log.warn;log.error) | ||
|
|
||
| dbwrite:use`di.dbwrite | ||
| dbwrite.init[(::);(enlist`log)!enlist logdep] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### `savedown[dir;part;tabname;data]` | ||
|
|
||
| Writes an in-memory table to a date-partitioned HDB. Enumerates symbol columns against the HDB sym file, applies `p#` to `sym` if present, writes the partition, then calls `sort` to sort and apply attributes per the loaded config. | ||
|
|
||
| **Parameters** | ||
|
|
||
| | Parameter | Type | Description | | ||
| |---|---|---| | ||
| | `dir` | hsym | HDB root directory (e.g. `` `:hdb ``) | | ||
| | `part` | date/month/int | Partition value | | ||
| | `tabname` | symbol | Table name — determines the partition subdirectory | | ||
| | `data` | table | In-memory table to write | | ||
|
|
||
| **Returns** — generic null on success; throws on write failure. | ||
|
|
||
| If `loadconfig` has not been called, the built-in default row applies (sort by `time` ascending). If the table has no `sym` column, enumeration and `p#` are skipped. | ||
|
|
||
| ```q | ||
| dbwrite.savedown[`:hdb;2024.01.02;`trade;data] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### `appenddown[dir;part;tabname;data]` | ||
|
|
||
| Appends rows to an existing on-disk partition. Enumerates symbol columns then appends; does not sort. Call `sort` explicitly when the partition is complete. | ||
|
|
||
| Keeping sort separate allows multiple intraday appends without the cost of re-sorting a growing partition on each call. | ||
|
|
||
| **Parameters** | ||
|
|
||
| | Parameter | Type | Description | | ||
| |---|---|---| | ||
| | `dir` | hsym | HDB root directory | | ||
| | `part` | date/month/int | Partition value | | ||
| | `tabname` | symbol | Table name | | ||
| | `data` | table | Rows to append | | ||
|
|
||
| **Returns** — generic null on success; throws if the partition does not exist or on write failure. | ||
|
|
||
| ```q | ||
| / intraday: append each batch as it arrives | ||
| dbwrite.appenddown[`:hdb;2024.01.02;`trade;batch] | ||
|
|
||
| / end-of-day: sort once when done | ||
| dbwrite.sort[(`trade;.Q.par[`:hdb;2024.01.02;`trade])] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### `loadconfig[file]` | ||
|
|
||
| Loads and validates the sort configuration CSV, storing the result in module state for use by `sort`. | ||
|
|
||
| **Parameters** | ||
|
|
||
| | Parameter | Type | Description | | ||
| |---|---|---| | ||
| | `file` | hsym | Path to the sort config CSV; pass null (`` ` ``) to warn and reset `params` to the default row | | ||
|
|
||
| **Returns** — generic null on success; throws on file/validation failure. | ||
|
|
||
| Validation checks that all four required columns (`tabname`, `att`, `column`, `sort`) are present and that all `att` values are within `` ``p`s`g`u ``. Throws a descriptive error for invalid files or unreadable paths. | ||
|
|
||
| Passing null warns at `warn` level and resets `params` to the default row — it does not throw. | ||
|
|
||
| ```q | ||
| dbwrite.loadconfig[`:config/sort.csv] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### `sort[d]` | ||
|
|
||
| Sorts an on-disk table partition and applies configured attributes. | ||
|
|
||
| **Parameters** | ||
|
|
||
| | Parameter | Type | Description | | ||
| |---|---|---| | ||
| | `d` | symbol or list | Table name alone, or `(tabname;dir)`, or `(tabname;list of dirs)` — see below | | ||
|
|
||
| `d` forms: | ||
|
|
||
| | Form | Example | | ||
| |---|---| | ||
| | Symbol | `` `trade `` | | ||
| | Tabname + single dir | `` (`trade;`:hdb/2024.01.02/trade/) `` | | ||
| | Tabname + dir list | `` (`trade;`:hdb/2024.01.02/trade/ `:hdb/2024.01.03/trade/) `` | | ||
|
|
||
| **Returns** — generic null on success; `()` if no sort config is found for the table. | ||
|
|
||
| Config lookup order within the loaded params: | ||
| 1. Rows where `tabname` matches — used directly. | ||
| 2. Rows where `tabname = \`default` — used with a `warn` log. | ||
| 3. No match — warns and returns `()`. | ||
|
|
||
| Sort and attribute errors are caught, logged, and swallowed. | ||
|
|
||
| ```q | ||
| dbwrite.sort[(`trade;`:hdb/2024.01.02/trade/)] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### `applyattr[dloc;colname;att]` | ||
|
|
||
| Applies a single kdb+ attribute to an on-disk column. | ||
|
|
||
| **Parameters** | ||
|
|
||
| | Parameter | Type | Description | | ||
| |---|---|---| | ||
| | `dloc` | hsym | On-disk partition directory (e.g. `` `:hdb/2024.01.02/trade/ ``) | | ||
| | `colname` | symbol | Column name | | ||
| | `att` | symbol | Attribute to apply: `` `p ``, `` `s ``, `` `g ``, or `` `u `` | | ||
|
|
||
| **Returns** — generic null on success. | ||
|
|
||
| Logs the attempt before applying. On failure, logs the error and continues — does not throw. | ||
|
|
||
| ```q | ||
| dbwrite.applyattr[`:hdb/2024.01.02/trade/;`sym;`p] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ### `gc[]` | ||
|
|
||
| Runs `.Q.gc[]` and logs before/after memory statistics. | ||
|
|
||
| **Returns** — generic null. | ||
|
|
||
| Emits two `info`-level log lines: memory stats before collection, and bytes recovered plus memory stats after. | ||
|
|
||
| ```q | ||
| dbwrite.gc[] | ||
| ``` | ||
|
|
||
| --- | ||
|
|
||
| ## Running tests | ||
|
|
||
| ```q | ||
| k4unit:use`di.k4unit | ||
| k4unit.moduletest`di.dbwrite | ||
| ``` | ||
|
|
||
| The test suite uses mock logging (no `di.log` dependency required). The mock wires up three no-op counters so log call counts can be asserted: | ||
|
|
||
| ```q | ||
| dbwrite:use`di.dbwrite | ||
| logcount:0 | ||
| loginfo:{[c;m] logcount::logcount+1} | ||
| logwarn:{[c;m] logcount::logcount+1} | ||
| logerr:{[c;m] logcount::logcount+1} | ||
| logdep:`info`warn`error!(loginfo;logwarn;logerr) | ||
| deps:(enlist`log)!enlist logdep | ||
| dbwrite.init[(::);deps] | ||
| ``` | ||
|
|
||
| Tests cover: dependency injection, `init` error on missing log dep, `savedown` write and sort, `savedown` without sym column, `appenddown` append without sort, explicit `sort` after `appenddown`, `appenddown` error on non-existent partition, `sort` with default row fallback / explicit config / `default` row fallback / no-match skip / empty input / wrong type, `loadconfig` with null file / valid file / unrecognised columns / unrecognised attributes / missing file / header-only file, `applyattr` on missing path / null column / invalid attribute / valid path, `gc` log count. | ||
|
|
||
| --- | ||
|
|
||
| ## Exported symbols | ||
|
|
||
| ```q | ||
| export:([init;savedown;appenddown;sort;applyattr;loadconfig;gc]) | ||
| ``` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| / sort params table - default row sorts all tables by time ascending | ||
| params:([] tabname:enlist`default; att:enlist`; column:enlist`time; sort:enlist 1b); | ||
|
|
||
|
|
||
| / load and validate sort.csv into .z.M.params | ||
| / file: hsym path; null warns and resets params to default row | ||
| loadconfig:{[file] | ||
| if[not -11h=type file; | ||
| '"loadconfig: file must be a symbol, got type ",(string type file)]; | ||
| if[null file; | ||
| .z.m.logwarn[`dbwrite;"loadconfig called with no file; resetting params to default"]; | ||
| @[.z.M;`params;:;([] tabname:enlist`default; att:enlist`; column:enlist`time; sort:enlist 1b)]]; | ||
| if[not null file; | ||
| file:hsym file; | ||
| p:@[ | ||
| {.z.m.loginfo[`dbwrite;"retrieving sort settings from ",string x];("SSSB";enlist",")0:x}; | ||
| file; | ||
| {[f;e]'"failed to open ",string[f],": ",e}[file] | ||
| ]; | ||
| if[not all spcb:(spc:cols p) in `tabname`att`column`sort; | ||
| '"unrecognised columns (",(", " sv string spc where not spcb),") in ",string file]; | ||
| if[not all atb:(at:distinct p`att) in ``p`s`g`u; | ||
| '"unrecognised attribute(s): ",", " sv string at where not atb]; | ||
| @[.z.M;`params;:;p]]; | ||
| }; | ||
|
|
||
| / apply a single kdb+ attribute to an on-disk column; logs and swallows errors | ||
| applyattr:{[dloc;colname;att] | ||
| .z.m.loginfo[`dbwrite;"applying ",string[att]," attr to ",string[colname]," in ",string dloc]; | ||
| if[null colname; | ||
| .z.m.logerr[`dbwrite;"applyattr called with null column name in ",string dloc]]; | ||
| if[not null colname; | ||
| $[not att in `p`s`g`u; | ||
| .z.m.logerr[`dbwrite;"applyattr: invalid attribute ",string[att]," for ",string[colname]," in ",string dloc]; | ||
| .[{@[x;y;z#]};(dloc;colname;att); | ||
| {[dloc;colname;att;e] | ||
| .z.m.logerr[`dbwrite;"unable to apply ",string[att]," attr to ",string[colname]," in ",string[dloc],": ",e] | ||
| }[dloc;colname;att] | ||
| ]]]; | ||
| }; | ||
|
|
||
| / sort an on-disk table partition and apply attributes per sort.csv config | ||
| / d: tabname | (tabname;dir) | (tabname;list of dirs) | ||
| sort:{[d] | ||
| $[not count d; | ||
| (); | ||
| not (type d) in -11 0 11h; | ||
| [.z.m.logerr[`dbwrite;"sort: d must be a symbol or list, got type ",(string type d)];()]; | ||
| [ | ||
| .z.m.loginfo[`dbwrite;"sorting ",(st:string t:first d)," table"]; | ||
| sp:$[count tabsp:select from .z.M.params where tabname=t; | ||
| [.z.m.loginfo[`dbwrite;"sort params found for: ",st];tabsp]; | ||
| count defsp:select from .z.M.params where tabname=`default; | ||
| [.z.m.logwarn[`dbwrite;"no sort params for: ",st,"; using defaults"];defsp]; | ||
| [.z.m.logwarn[`dbwrite;"no sort params for: ",st,"; skipping sort"];:()]]; | ||
| {[sp;dloc] | ||
| if[count sortcols:exec column from sp where sort,not null column; | ||
| .z.m.loginfo[`dbwrite;"sorting ",string[dloc]," by: ",", " sv string sortcols]; | ||
| .[xasc;(sortcols;dloc); | ||
| {[sc;dl;e] | ||
| .z.m.logerr[`dbwrite;"failed to sort ",string[dl]," by ",(", " sv string sc),": ",e] | ||
| }[sortcols;dloc]]]; | ||
| if[count attrcols:select column,att from sp where not null att; | ||
| .z.M.applyattr[dloc;;]'[attrcols`column;attrcols`att]]; | ||
| }[sp] each distinct (),last d; | ||
| .z.m.loginfo[`dbwrite;"finished sorting ",st," table"] | ||
| ]] | ||
| }; | ||
|
|
||
|
|
||
| / write table to a date-partitioned hdb: enumerate syms, apply p# to sym if present, write, then sort | ||
| / dir: hdb root (hsym); part: partition value (date/month/int); tabname: symbol; data: in-memory table | ||
| savedown:{[dir;part;tabname;data] | ||
| .z.m.loginfo[`dbwrite;"saving ",string[tabname]," partition ",string[part]," to ",string dir]; | ||
| path:` sv (.Q.par[dir;part;tabname];`); | ||
| data:.Q.en[dir;data]; | ||
| path set $[`sym in cols data;@[data;`sym;{`p#x}];data]; | ||
| sort[(tabname;path)]; | ||
| .z.m.loginfo[`dbwrite;"finished saving ",string tabname]; | ||
| }; | ||
|
|
||
| / append data to an existing on-disk partition; enumerate syms but do not sort | ||
| / call sort separately when the partition is complete | ||
| / dir: hdb root (hsym); part: partition value; tabname: symbol; data: in-memory table | ||
| appenddown:{[dir;part;tabname;data] | ||
| .z.m.loginfo[`dbwrite;"appending ",string[tabname]," partition ",string[part]," in ",string dir]; | ||
| path:` sv (.Q.par[dir;part;tabname];`); | ||
| if[not count @[key;path;{`$()}]; | ||
| '"appenddown: partition does not exist at ",string path]; | ||
| .[path;();,;.Q.en[dir;data]]; | ||
| .z.m.loginfo[`dbwrite;"finished appending ",string tabname]; | ||
| }; | ||
|
|
||
| / format current process memory stats as a loggable string | ||
| memstats:{[]"mem stats: ",{"; "sv "=" sv'flip(string key x;(string value x),\:" MB")}`long$.Q.w[]%1048576}; | ||
|
|
||
| / run .Q.gc[] and log before/after memory stats | ||
| gc:{ | ||
| .z.m.loginfo[`dbwrite;"starting garbage collect. ",.z.M.memstats[]]; | ||
| r:.Q.gc[]; | ||
| .z.m.loginfo[`dbwrite;"garbage collection returned ",(string `long$r%1048576),"MB. ",.z.M.memstats[]] | ||
| }; | ||
|
|
||
| init:{[config;deps] | ||
| / config: unused, pass (::) | ||
| / deps: `log!(logdict) - `info`warn`error!(infofunc;warnfunc;errfunc) - required | ||
| logdict:$[99h=type deps;$[(`log in key deps) and not (::)~deps`log;deps`log;()!()];()!()]; | ||
| if[not count logdict; | ||
| '"di.dbwrite: log dependency is required; pass `info`warn`error functions - see di.log or refer to confluence documentation"; | ||
| ]; | ||
| .z.m.loginfo:logdict`info; | ||
| .z.m.logwarn:logdict`warn; | ||
| .z.m.logerr:logdict`error; | ||
| }; | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| / dbwrite module - sort, attribute application, save-down manipulation, and GC utilities | ||
| / used by processes that persist data to disk (rdb, wdb, tickerlogreplay) | ||
|
|
||
| \l ::dbwrite.q | ||
|
|
||
| export:([init;savedown;appenddown;sort;applyattr;loadconfig;gc]) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config argument not actually called within this function?