diff --git a/.log b/.log new file mode 100644 index 00000000..fe91fb44 Binary files /dev/null and b/.log differ diff --git a/di/async/test.csv b/di/async/test.csv deleted file mode 100644 index 27a4c5bb..00000000 --- a/di/async/test.csv +++ /dev/null @@ -1,27 +0,0 @@ -action,ms,bytes,lang,code,repeat,minver,comment -before,0,0,q,async:use`di.async,1,1,load module into session - -comment,,,,,,,Testing async.deferred -run,0,0,q,.test.bdefres1:async.deferred[h;({x+1};1)],1,,testing async.deferred call that should execute on both servers -true,0,0,q,.test.bdefres1~((1b;1b);(2;2)),1,,successful async.deferred call with expected result -run,0,0,q,.test.bdefres2:async.deferred[h;({x+`a};1)],1,,testing async.deferred call that should fail -true,0,0,q,not any .test.bdefres2[0;],1,,async.deferred call failed on both servers with expected result -run,0,0,q,.test.bdefres3:async.deferred[h;(`f;1)],1,,testing async.deferred call that should fail on one server -true,0,0,q,10b ~ .test.bdefres3[0;],1,,async.deferred call failed on one server with expected result -run,0,0,q,.test.bdefres3:async.deferred[h;(`f;1)],1,,testing async.deferred call that should fail on one server -true,0,0,q,10b ~ .test.bdefres3[0;],1,,async.deferred call failed on one server with expected result - -comment,,,,,,,Testing async.postback -run,0,0,q,".test.bpbrestab:([]handle:();time:();result:())",1,,define table .test.bpbrestab to store results from async.postback -run,0,0,q,".test.storeresult:{`.test.bpbrestab upsert (.z.w;.z.t;enlist x)}",1,,define a function .test.storeresult to store the posted back results in .test.bpbrestab -run,0,0,q,async.postback[h;({x+1};2);`.test.storeresult],1,,testing async.postback that should execute on both servers -run,0,0,q,@[;"";()] each h,1,,wait for server response -true,0,0,q,(enlist 3;enlist 3) ~ -2#exec result from .test.bpbrestab,1,,successful async.postback with expected result -run,0,0,q,async.postback[h;({x+`a};2);`.test.storeresult],1,,testing async.postback call that should fail on both servers -run,0,0,q,@[;"";()] each h,1,,wait for server response -true,0,0,q,all all (exec result from -2#.test.bpbrestab) like\: "error*",1,,async.postback call failed on both servers with expected result -run,0,0,q,async.postback[h;(`f;1);`.test.storeresult],1,,testing async.postback call that should fail on one server -run,0,0,q,@[;"";()] each h,1,,wait for server response -true,0,0,q,(enlist 2;enlist "error: server fail: f") ~ exec result from -2#.test.bpbrestab,1,,async.postback call failed on one server with expected result - -run,0,0,q,@[;"exit 0";()] each neg h,,1,,closing remaining server process \ No newline at end of file diff --git a/di/asyncdispatch/asyncdispatch.md b/di/asyncdispatch/asyncdispatch.md new file mode 100644 index 00000000..dbe5ee3c --- /dev/null +++ b/di/asyncdispatch/asyncdispatch.md @@ -0,0 +1,191 @@ +# asyncdispatch + +`asyncdispatch.q` is the async multi-process query coordinator extracted from the TorQ gateway's core engine (`.gw.*`). It queues client queries, dispatches them to available backend ("server") processes, collects the per-server results, applies a join function, and replies to the client — with timeout management and error propagation if a backend disconnects mid-query. + +**Standalone value:** any process that needs scatter-gather across multiple backend processes (not just a "gateway") can load this module to get queueing, dispatch, join and timeout handling for free. + +**Out of scope** (left to other modules in the gateway decomposition): +- **Routing** — deciding *which* servers satisfy a query is `di.serverselect`'s job. This module is handed a resolved list of servertypes and dispatches to whatever of those is currently idle. +- **Permissions** — `.pm.*`/`execas`/`valp`-style checks belong in `di.gateway`/`di.permissions`. +- **EOD / discovery / dashboard & REST handlers** — `di.gateway` concerns. + +--- + +## Loading + +```q +ad:use`di.asyncdispatch +``` + +Returns a handle (`ad`) exposing the exported API, e.g. `ad.execquery[...]`. + +--- + +## Configuration & pluggable hooks + +| Variable | Default | Setter | Purpose | +|---|---|---|---| +| `errorprefix` | `"error: "` | — | Prefix added to error strings sent back to clients | +| `querykeeptime` | `0D00:30` | — | How long `removequeries` keeps finished queries in `queryqueue` | +| `clearinactivetime` | `0D01:00` | — | How long `removeinactive` keeps records for disconnected servers | +| `synccallsallowed` | `0b` | — | Whether `execquery[...;1b]` (sync mode) is permitted | +| `cp` | `{.z.p}` | `setcp` | Current-time function. Override for simulation/backtesting | +| `formatresponse` | see below | `setformatresponse` | Final transform applied to a result/error before it is sent to the client | +| `availableservers` | built-in | `setavailableservers` | How "which servers are free" is computed | +| `getnextqueryid` | `fifo` | `setgetnextqueryid` | Scheduler — picks the next query to dispatch | +| `resultcallback` / `errorcallback` | `` `addserverresult `` / `` `addservererror `` | `setcallbacks` | Symbols backend servers call back into to report results/errors | + +### `formatresponse[status;sync;result]` +- `status` — `1b` success, `0b` error. +- `sync` — `1b` if the client used a deferred-sync call (`execquery[...;1b]`), `0b` for async. +- Default: on a synchronous error (`not status` and `sync`), **signals** `result` as an error back up the `-30!` deferred response so the client's sync call raises. Otherwise passes `result` through unchanged. + +### `availableservers[excludeinuse]` +- `excludeinuse=1b` → only **idle, active** servers. +- `excludeinuse=0b` → all **active** servers. +- Override for custom load-balancing. + +### `getnextqueryid[]` (scheduler) +- Default: oldest runnable query first (FIFO) — a runnable query is one where all required servertypes have an idle server available. +- Override `getnextqueryid` for priority queues. The override receives no arguments and must return a 1-row (or empty) table with the `queryqueue` schema. + +### `resultcallback` / `errorcallback` +`serverexecute` (the function sent to backend servers) posts its outcome back via `(resultcallback;queryid;result)` / `(errorcallback;queryid;error)` over `neg .z.w`. Since this module has no fixed namespace path when used standalone, the consumer must point these symbols at wherever it mounted this module on the **backend** processes, e.g.: + +```q +ad.setcallbacks[`.gw.dispatch.addserverresult;`.gw.dispatch.addservererror] +``` + +--- + +## Core data structures + +### `servers` (keyed table, key: `handle`) + +| Column | Type | Description | +|---|---|---| +| `handle` | `int` | Connection handle to the backend process | +| `servertype` | `symbol` | e.g. `` `rdb ``, `` `hdb `` | +| `inuse` | `boolean` | Currently running a query | +| `active` | `boolean` | Connected (`0b` once disconnected) | +| `disconnecttime` | `timestamp` | When the server was marked inactive; null while active | + +### `queryqueue` (keyed table, key: `queryid`) + +| Column | Type | Description | +|---|---|---| +| `queryid` | `long` | Allocated by `addquery` | +| `time` | `timestamp` | When the query was queued | +| `clienth` | `int` | `.z.w` of the requesting client | +| `query` | `()` | The query payload — whatever `value` can execute | +| `servertype` | `()` | List of servertype symbols the query must run on | +| `join` | `()` | Function applied to the collected per-server results | +| `postback` | `()` | `()` for a plain reply, else `(function;extra args...)` to wrap the reply | +| `timeout` | `timespan` | `0Wn` for none | +| `returntime` | `timestamp` | Set by `finishquery` once complete | +| `error` | `boolean` | `1b` if the query ended in error | +| `sync` | `boolean` | `1b` if the client is waiting on a deferred (`-30!`) response | + +### `clients` (table) + +| Column | Type | Description | +|---|---|---| +| `time` | `timestamp` | Connection time | +| `clienth` | `int` | `.z.w` of the client | +| `user` | `symbol` | `.z.u` | +| `ip` | `int` | `.z.a` | +| `host` | `symbol` | `.z.h` | + +### `results` (dict) + +`queryid -> (clienth; servertype!(handle;result;received))` + +- Keys are the servertype symbols passed to `addquery`. +- Each value is a 3-list: `(handle assigned; result once it arrives; received flag)`. +- A query is complete once every `received` flag is `1b`. +- Entries are removed by `finishquery` once the query completes. + +--- + +## Functions + +### Server registry +- **`addserver[handle;servertype]`** — register a backend connection. +- **`availableservers[excludeinuse]`** — query which servers can take work now (see Pluggable hooks). + +### Client tracking +- **`addclientdetails[h]`** — call from `.z.po` to record a new client connection in `clients`. +- **`removeclienthandle[h]`** — call from `.z.pc`. Stamps any of that client's unfinished queries as returned/errored and drops their `results` entries. + +### Queueing +- **`addquery[query;servertype;join;postback;timeout;sync]`** — low-level: insert a row into `queryqueue`. Does **not** dispatch — call `runnextquery[]` afterwards (or use `execquery` which does both). +- **`removequeries[age]`** — purge `queryqueue` rows with `returntime` older than `age`. + +### Scheduling +- **`getnextqueryid[]`** — returns the next runnable query (FIFO by default: oldest query for which all required servertypes have an idle server). Override via `setgetnextqueryid` for priority queues. + +### Result collection & joining +- **`addserverresult[qid;data]`** — called when a backend posts back success. Fills slot, frees server, tries to run the next query; once all slots are received applies `join`, sends the reply, and finishes the query. +- **`addservererror[qid;err]`** — called when a backend posts back an error. Sends the error to the client and finishes the query. + +### Dispatch +- **`serverexecute[qid;query]`** — **runs on the backend**. Executes `value query`, trapping errors, and posts the outcome back to the dispatcher via `resultcallback`/`errorcallback`. +- **`runnextquery[]`** — picks the next runnable query via `getnextqueryid`, resolves idle servers, and dispatches. + +### Timeouts & disconnects +- **`checktimeout[]`** — finds queries past their `timeout` with no `returntime`, sends a timeout error, and finishes them. +- **`removeserverhandle[serverh]`** — call from `.z.pc` for **backend** handles. Errors any in-flight or queued queries that depended on this server, marks the server `active:0b`, and calls `runnextquery[]`. +- **`removeinactive[age]`** — purge `servers` rows that have been inactive for longer than `age`. + +### Public API +- **`execquery[query;servertype;join;postback;timeout;sync]`** — queue + dispatch. `sync=0b` for async (uses `postback`); `sync=1b` for deferred sync via `-30!` (`postback` ignored, errors if `synccallsallowed` is `0b`). + +### Housekeeping +- **`init[timerrepeat]`** — optionally wires `removequeries`, `checktimeout` and `removeinactive` into a recurring timer. `timerrepeat` should have the signature `.timer.repeat[starttime;endtime;period;(func;params);description]`, or pass `(::)` to skip. + +--- + +## Example usage + +```q +/ -- dispatcher process -- +ad:use`di.asyncdispatch + +/ point backends' callbacks at this module's mount point on this process +ad.setcallbacks[`ad.addserverresult;`ad.addservererror] + +/ register backend connections as they connect +h:hopen`:backend1:5001 +ad.addserver[h;`rdb] + +/ track gateway clients +.z.po:{ad.addclientdetails[.z.w]} +.z.pc:{ad.removeclienthandle[.z.w]; ad.removeserverhandle[.z.w]} + +/ a client calls this asynchronously: +ad.execquery["select count i by sym from trade";enlist`rdb;raze;();0Wn;0b] +/ -> queues the query, dispatches it to the rdb, and (once the rdb replies) +/ razes the single result and sends it back to the calling client + +/ housekeeping - run periodically (e.g. via di.timer) +ad.checktimeout[] +ad.removequeries[ad.querykeeptime] +ad.removeinactive[ad.clearinactivetime] +``` + +```q +/ -- backend process -- +ad:use`di.asyncdispatch +/ nothing else required: when the dispatcher sends (serverexecute;qid;query), +/ this process runs value query and posts the result/error back via +/ ad.addserverresult / ad.addservererror on the dispatcher +``` + +--- + +## Notes + +- `.z.M.` is used for in-place mutation of tables (`upsert`/`insert`/`update from`/`delete from`), and `.z.m.:value` for whole-variable reassignment — the same convention used by `di.cache`. +- Module globals referenced inside q-sql expressions (WHERE conditions, UPDATE SET values) must use `.z.m.varname` form, since q-sql evaluates globals in the calling context rather than the module namespace. +- `servertype` on `queryqueue`/`addquery` is deliberately generic: it is a list of servertype symbols used to look up idle servers of each type. Pass the same servertype list to `addserver` and `addquery` to wire them together. +- This module does not open or accept any connections itself — `addserver` and the `.z.po`/`.z.pc` wiring are the consumer's responsibility, by design (keeps this module dependency-free and testable in-process). diff --git a/di/asyncdispatch/asyncdispatch.q b/di/asyncdispatch/asyncdispatch.q new file mode 100644 index 00000000..a7e0d40a --- /dev/null +++ b/di/asyncdispatch/asyncdispatch.q @@ -0,0 +1,159 @@ +/ di.asyncdispatch - async scatter-gather query coordinator. +/ Queues queries, dispatches to available backends, collects results per server, +/ applies a join function, and replies to the client. +/ Routing (which servers satisfy a query) is di.serverselect's responsibility. + +errorprefix:"error: "; +querykeeptime:0D00:30; +clearinactivetime:0D01:00; +synccallsallowed:0b; + +cp:{.z.p} / injectable clock; swap out in tests to control time +setcp:{.z.m.cp:x} / allows runtime replacement of the clock without editing the module + +formatresponse:{[status;sync;result]$[not[status]and sync;'result;result]} / sync errors must be signalled with ' so the client receives a trapped error; async errors pass through unchanged +setformatresponse:{.z.m.formatresponse:x} / override reply formatting without editing the module + +/ symbols backend servers call back via; set to wherever this module is mounted +resultcallback:`addserverresult; / stored as symbol so the name survives IPC serialization to backend processes +errorcallback:`addservererror; +setcallbacks:{[resfn;errfn].z.m.resultcallback:resfn;.z.m.errorcallback:errfn} / update callback symbols when module is mounted under a non-default namespace + +/ registered backend servers +servers:([handle:`u#`int$()] servertype:`symbol$(); inuse:`boolean$(); active:`boolean$(); disconnecttime:`timestamp$()) + +/ pending and in-flight client queries +queryqueue:([queryid:`u#`long$()] time:`timestamp$(); clienth:`int$(); query:(); servertype:(); join:(); postback:(); timeout:`timespan$(); returntime:`timestamp$(); error:`boolean$(); sync:`boolean$()) + +/ connected client tracking +clients:([] time:`timestamp$(); clienth:`int$(); user:`symbol$(); ip:`int$(); host:`symbol$()) + +/ per-query result accumulator: queryid -> (clienth; servertype!(handle;result;done)) +results:()!() + +queryid:0; + +addserver:{[h;st].z.M.servers upsert (h;st;0b;1b;0Np)} / register a backend handle and servertype so it becomes eligible for dispatch + +availableservers:{[excludeinuse] / centralise the active+idle filter so dispatch and routing share one definition + $[excludeinuse; + select from servers where active, not inuse; + select from servers where active]}; +setavailableservers:{.z.m.availableservers:x} / swap in a custom routing strategy without forking core dispatch + +addclientdetails:{[h].z.M.clients insert (cp[];h;.z.u;.z.a;.z.h)} / record client identity on connect for audit and orphan-query cleanup on disconnect + +removeclienthandle:{[h] / on client disconnect, mark their pending queries errored so result slots are not leaked + update error:1b,returntime:.z.m.cp[] from .z.M.queryqueue where clienth=h, null returntime; + .z.m.results:(exec queryid from .z.m.queryqueue where clienth=h)_results}; + +addquery:{[query;servertype;join;postback;timeout;sync] / enqueue a query without dispatching; caller must call runnextquery[] to trigger dispatch + .z.M.queryqueue upsert (queryid;cp[];.z.w;query;servertype;join;{$[11h=type x;enlist x;x]}postback;timeout;0Np;0b;sync); + .z.m.queryid:queryid+1}; + +removequeries:{[age] / prevent queryqueue growing unboundedly; purge completed queries older than age + .z.m.queryqueue:0!delete from .z.m.queryqueue where not null returntime, .z.m.cp[]>returntime+age}; + +getnextqueryid:{ / pick the oldest FIFO-eligible query whose required servertypes are all currently idle + avail:exec distinct servertype from availableservers 1b; + runnable:0!select from .z.m.queryqueue where null returntime, not queryid in key .z.m.results, {all x in y}[;avail] each servertype; + 1 sublist select from runnable where time=min time}; +setgetnextqueryid:{.z.m.getnextqueryid:x} / inject a priority or custom scheduling strategy without forking the module + +addserverresult:{[qid;data] / fill one result slot; once all slots for a query are filled, run the join function and reply to the client + if[not qid in key results;:()]; + st:first exec servertype from .z.m.servers where handle=.z.w; + slots:results[qid;1]; + slots[st]:(.z.w;data;1b); + results[qid]:(results[qid;0];slots); + update inuse:0b from .z.M.servers where handle in .z.w; + runnextquery[]; + if[not qid in key results;:()]; + vals:value results[qid;1]; + if[not all vals[;2];:()]; + qd:queryqueue[qid]; + res:@[{(0b;x y)}[qd`join];vals[;1];{(1b;errorprefix,"join failed: ",x)}]; + sendclientreply[qid;last res;not res 0]; + finishquery[qid;res 0]}; + +addservererror:{[qid;err] / short-circuit a query on backend failure; free the server and notify the client before moving on + sendclientreply[qid;errorprefix,err;0b]; + update inuse:0b from .z.M.servers where handle in .z.w; + runnextquery[]; + finishquery[qid;1b]}; + +sendclientreply:{[qid;result;status] / deliver result or error to the client, handling sync vs async send and postback wrapping in one place + qd:queryqueue[qid]; + if[qd`error;:()]; + tosend:$[()~qd`postback;result;qd[`postback],enlist[qd`query],enlist result]; + $[qd`sync; + @[-30!;(qd`clienth;not status;$[status;formatresponse[1b;1b;result];result]);{}]; + @[neg qd`clienth;formatresponse[status;0b;tosend];()]]}; + +finishquery:{[qid;err] / remove query from the live results accumulator and stamp its completion time; keeps queryqueue and results consistent + .z.m.results:(qid,())_results; + update error:err,returntime:.z.m.cp[] from .z.M.queryqueue where queryid in qid}; + +serverexecute:{[qid;query] / runs on the backend; traps errors locally so a crash posts an error reply rather than silently dropping the result + res:@[{(0b;value x)};query;{(1b;"server ",(string .z.h),":",(string system"p"),": ",x)}]; + @[neg .z.w;$[res 0;(errorcallback;qid;res 1);(resultcallback;qid;res 1)]; + {@[neg .z.w;(errorcallback;x;"failed to return result: ",y);()]}[qid]]}; + +sendquerytoserver:{[qid;query;handles] / fan the query out to all required handles and mark them in-use atomically to prevent double-dispatch + (neg handles,:())@\:(serverexecute;qid;query); + update inuse:1b from .z.M.servers where handle in handles}; + +runnextquery:{ / pick the next dispatchable query and fan out to one idle server per required servertype; called after any state change that may unblock work + if[0=count torun:getnextqueryid[];:()]; + torun:first torun; + avail:exec first handle by servertype from availableservers 1b; + types:torun`servertype; + handles:avail types; + qid:torun`queryid; + slots:types!(count[types],())#enlist(0Ni;(::);0b); + slots[types;0]:handles; + results[qid]:(torun`clienth;slots); + sendquerytoserver[qid;torun`query;handles]}; + +checktimeout:{ / periodic scan to error queries that have waited beyond their timeout, preventing them from hanging indefinitely + qids:exec queryid from .z.m.queryqueue where not timeout=0Wn, null returntime, .z.m.cp[]>time+timeout; + if[count qids; + sendclientreply[;errorprefix,"query timed out";0b] each qids; + finishquery[qids;1b]]}; + +removeserverhandle:{[serverh] / on backend disconnect, error in-flight queries using that handle and queued queries that can no longer be satisfied + if[null st:first exec servertype from .z.m.servers where handle=serverh;:()]; + err:errorprefix,"backend ",string[st]," server disconnected"; + + / in-flight: queries where this handle was assigned to a slot + qids:where {[h;qid]h in value[.z.m.results[qid;1]][;0]}[serverh] each key .z.m.results; + sendclientreply[;err," during query";0b] each qids; + finishquery[qids;1b]; + + / queued: queries that can no longer be satisfied by remaining active servers + activetypes:exec distinct servertype from .z.m.servers where active, handle<>serverh; + qids2:exec queryid from .z.m.queryqueue where null returntime, not queryid in key .z.m.results, + not {all x in y}[;activetypes] each servertype; + sendclientreply[;err,", query cannot be satisfied";0b] each qids2; + finishquery[qids2;1b]; + + update active:0b,disconnecttime:.z.m.cp[] from .z.M.servers where handle=serverh; + runnextquery[]}; + +removeinactive:{[age]delete from .z.M.servers where not active, .z.m.cp[]>disconnecttime+age} / prune stale disconnected-server rows to stop the servers table growing forever + +execquery:{[query;servertype;join;postback;timeout;sync] / public entry point; validate sync constraints then enqueue and kick dispatch + if[sync; + if[not synccallsallowed;'"syncexec: synchronous calls are not allowed"]; + if[not @[{-30!x;1b};(::);0b];'"syncexec: deferred response not supported on this connection"]; + .[{[q;s;j;t]addquery[q;s;j;();t;1b];runnextquery[]};(query;servertype;join;timeout);{-30!(.z.w;1b;x)}]; + :()]; + addquery[query;servertype;join;postback;timeout;0b]; + runnextquery[]}; + +/ wire housekeeping into a timer - pass (::) to skip +init:{[timerrepeat] / wire recurring housekeeping (timeout scan, query purge, server purge) into a provided timer; pass (::) to skip registration + if[not timerrepeat~(::); + timerrepeat[cp[];0Wp;0D00:05:00;(.z.m.removequeries;querykeeptime);"asyncdispatch: remove old queries"]; + timerrepeat[cp[];0Wp;0D00:00:05;(.z.m.checktimeout;`);"asyncdispatch: timeout expired queries"]; + timerrepeat[cp[];0Wp;0D00:05:00;(.z.m.removeinactive;clearinactivetime);"asyncdispatch: remove inactive servers"]]}; diff --git a/di/asyncdispatch/init.q b/di/asyncdispatch/init.q new file mode 100644 index 00000000..f2a53a3b --- /dev/null +++ b/di/asyncdispatch/init.q @@ -0,0 +1,11 @@ +\l ::asyncdispatch.q + +export:([ + errorprefix;querykeeptime;clearinactivetime;synccallsallowed; / user-tunable config: error text prefix and housekeeping intervals + setcp;setformatresponse;setcallbacks;setavailableservers;setgetnextqueryid; / user-injectable overrides: clock, reply format, callback namespace, routing and scheduling + addserver;removeserverhandle;availableservers; / server lifecycle: register, deregister and inspect available backends + addclientdetails;removeclienthandle; / client lifecycle: wire into .z.po / .z.pc + addserverresult;addservererror; / IPC return paths: backends resolve these by name to deliver results or errors + execquery; / public API: only entry point for submitting a query + servers;queryqueue;clients; / observable state (tables): monitoring and diagnostics + init]) / startup: wires housekeeping into the provided timer diff --git a/di/asyncdispatch/test.csv b/di/asyncdispatch/test.csv new file mode 100644 index 00000000..a55cc318 --- /dev/null +++ b/di/asyncdispatch/test.csv @@ -0,0 +1,66 @@ +action,ms,bytes,lang,code,repeat,minver,comment +before,0,0,q,ad:use`di.asyncdispatch,1,,load module +before,0,0,q,srv:{.m.di.0asyncdispatch.servers},1,,live servers table +before,0,0,q,qq:{.m.di.0asyncdispatch.queryqueue},1,,live queryqueue table +before,0,0,q,cl:{.m.di.0asyncdispatch.clients},1,,live clients table +before,0,0,q,res:{.m.di.0asyncdispatch.results},1,,live results dict + +/ Test 0: server registry +run,0,0,q,ad.addserver[0i;`mock],1,,register a mock server +true,0,0,q,(enlist 0i)~exec handle from srv[] where active,1,,server is active +true,0,0,q,(enlist 0i)~exec handle from ad.availableservers 1b,1,,idle server is available + +/ Test 1: queueing and FIFO scheduling +run,0,0,q,ad.addquery["2+2";enlist`mock;raze;();0Wn;0b],1,,queue a query for the mock servertype +true,0,0,q,1~count select from qq[] where query~\:"2+2",1,,query was added to the queue +true,0,0,q,1~count ad.getnextqueryid[],1,,query is runnable - idle mock server available +true,0,0,q,(enlist`mock)~first exec servertype from ad.getnextqueryid[],1,,getnextqueryid identifies the available servertype + +/ Test 2: full dispatch -> result -> join -> reply with IPC stubbed out +run,0,0,q,.m.di.0asyncdispatch.sendquerytoserver:{[qid;qry;sh] ad.addserverresult[qid;value qry]},1,,stub dispatch: run inline and feed result back via addserverresult +run,0,0,q,qid1:exec first queryid from qq[] where query~\:"2+2",1,,capture the queued query id +run,0,0,q,ad.runnextquery[],1,,dispatch - stub executes and feeds result through addserverresult/checkresults +true,0,0,q,0b~first exec error from qq[] where queryid=qid1,1,,query completed without error +true,0,0,q,not null first exec returntime from qq[] where queryid=qid1,1,,returntime is stamped +true,0,0,q,not qid1 in key res[],1,,result accumulator cleaned up after join + +/ Test 3: checktimeout +run,0,0,q,ad.addquery["3+3";enlist`mock;raze;();0D00:00:00.000000001;0b],1,,queue with a near-zero timeout +run,0,0,q,qid2:exec first queryid from qq[] where query~\:"3+3",1,,capture the queued query id +run,0,0,q,ad.checktimeout[],1,,timeout sweep should catch the expired query +true,0,0,q,1b~first exec error from qq[] where queryid=qid2,1,,timed-out query is flagged as error +true,0,0,q,not null first exec returntime from qq[] where queryid=qid2,1,,timed-out query has returntime stamped + +/ Test 4: removequeries purges completed queries +run,0,0,q,ad.setcp[{.z.p+2D}],1,,fast-forward the clock by 2 days +run,0,0,q,ad.removequeries ad.querykeeptime,1,,purge queries older than querykeeptime +true,0,0,q,0~count select from qq[] where queryid in qid1,qid2,1,both completed queries purged +run,0,0,q,ad.setcp[{.z.p}],1,,restore the real clock + +/ Test 5: removeserverhandle errors queued queries when the only server disconnects +run,0,0,q,ad.addserver[1i;`mock2],1,,register a second mock server +run,0,0,q,ad.addquery["4+4";enlist`mock2;raze;();0Wn;0b],1,,queue a query that can only run on mock2 +run,0,0,q,qid3:exec first queryid from qq[] where query~\:"4+4",1,,capture the queued query id +run,0,0,q,ad.removeserverhandle[1i],1,,disconnect the only server able to run the query +true,0,0,q,1b~first exec error from qq[] where queryid=qid3,1,,query errored as no server can satisfy it +true,0,0,q,0b~first exec active from srv[] where handle=1i,1,,disconnected server marked inactive + +/ Test 6: removeinactive purges inactive server records +run,0,0,q,ad.setcp[{.z.p+2D}],1,,fast-forward clock past clearinactivetime +run,0,0,q,ad.removeinactive[ad.clearinactivetime],1,,purge inactive servers +true,0,0,q,0~count select from srv[] where handle=1i,1,,inactive server record removed +run,0,0,q,ad.setcp[{.z.p}],1,,restore real clock + +/ Test 7: client tracking +run,0,0,q,ad.addclientdetails[2i],1,,record a connected client +true,0,0,q,1~count select from cl[] where clienth=2i,1,,client connection is tracked +run,0,0,q,ad.removeclienthandle[2i],1,,disconnect the client +true,0,0,q,0~count select from qq[] where clienth=2i,null returntime,1,client unfinished queries cleared + +/ Test 8: pluggable hooks +run,0,0,q,ad.setformatresponse[{[status;sync;result]result}],1,,override formatresponse to pass result through +true,0,0,q,5~ad.formatresponse[1b;0b;5],1,,overridden formatresponse is used +run,0,0,q,ad.setgetnextqueryid[{()}],1,,override scheduler to never pick a query +true,0,0,q,0~count ad.getnextqueryid[],1,,overridden scheduler is used +run,0,0,q,ad.setcallbacks[`myresult;`myerror],1,,override callback symbols +true,0,0,q,(`myresult;`myerror)~(.m.di.0asyncdispatch.resultcallback;.m.di.0asyncdispatch.errorcallback),1,,callback symbols updated diff --git a/di/async/async.md b/di/asyncutil/asyncutil.md similarity index 88% rename from di/async/async.md rename to di/asyncutil/asyncutil.md index aadb31fd..9b0f159a 100644 --- a/di/async/async.md +++ b/di/asyncutil/asyncutil.md @@ -22,28 +22,28 @@ If either of these are carried out via asynchronous broadcast, the request will Note, in each of the examples below handles is a list of two handles to different server processes -##### async.deferred +##### asyncutil.deferred Can be used to make deferred synchronous calls via asynchronous broadcast. It will send the query down each of the handles, then block and wait on the handles The result set is of the form (successvector each handle; result vector) Note, that if there is an issue with any of the handles, the query won't be sent down any handle ```q -// async.deferred[handles;query] -q)async.deferred[handles;"2+2"] +// asyncutil.deferred[handles;query] +q)asyncutil.deferred[handles;"2+2"] 1 1 4 4 ``` -##### async.postback +##### asyncutil.postback Can be used to make asynchronous postback calls via asynchronous broadcast. Wrap the supplied query in a postback function Don't block the handle when waiting Success vector is returned that it has been sent correctly The result is then returned once executed by the server, although it is not wrapped in the status -Similar to async.deferred, if there is an issue with any of the handles, the query won't be sent down any handle +Similar to asyncutil.deferred, if there is an issue with any of the handles, the query won't be sent down any handle ```q -// async.postback[handles;query;postback] -q)async.postback[handles;"2+2";{show x}] +// asyncutil.postback[handles;query;postback] +q)asyncutil.postback[handles;"2+2";{show x}] 11b 4 4 diff --git a/di/async/async.q b/di/asyncutil/asyncutil.q similarity index 100% rename from di/async/async.q rename to di/asyncutil/asyncutil.q diff --git a/di/async/init.q b/di/asyncutil/init.q similarity index 63% rename from di/async/init.q rename to di/asyncutil/init.q index 1455cbcb..d0950d9a 100644 --- a/di/async/init.q +++ b/di/asyncutil/init.q @@ -1,3 +1,3 @@ -\l ::async.q +\l ::asyncutil.q export:([deferred;postback]) \ No newline at end of file diff --git a/di/asyncutil/test.csv b/di/asyncutil/test.csv new file mode 100644 index 00000000..791466ba --- /dev/null +++ b/di/asyncutil/test.csv @@ -0,0 +1,27 @@ +action,ms,bytes,lang,code,repeat,minver,comment +before,0,0,q,asyncutil:use`di.asyncutil,1,1,load module into session + +comment,,,,,,,Testing asyncutil.deferred +run,0,0,q,.test.bdefres1:asyncutil.deferred[h;({x+1};1)],1,,testing asyncutil.deferred call that should execute on both servers +true,0,0,q,.test.bdefres1~((1b;1b);(2;2)),1,,successful asyncutil.deferred call with expected result +run,0,0,q,.test.bdefres2:asyncutil.deferred[h;({x+`a};1)],1,,testing asyncutil.deferred call that should fail +true,0,0,q,not any .test.bdefres2[0;],1,,asyncutil.deferred call failed on both servers with expected result +run,0,0,q,.test.bdefres3:asyncutil.deferred[h;(`f;1)],1,,testing asyncutil.deferred call that should fail on one server +true,0,0,q,10b ~ .test.bdefres3[0;],1,,asyncutil.deferred call failed on one server with expected result +run,0,0,q,.test.bdefres3:asyncutil.deferred[h;(`f;1)],1,,testing asyncutil.deferred call that should fail on one server +true,0,0,q,10b ~ .test.bdefres3[0;],1,,asyncutil.deferred call failed on one server with expected result + +comment,,,,,,,Testing asyncutil.postback +run,0,0,q,".test.bpbrestab:([]handle:();time:();result:())",1,,define table .test.bpbrestab to store results from asyncutil.postback +run,0,0,q,".test.storeresult:{`.test.bpbrestab upsert (.z.w;.z.t;enlist x)}",1,,define a function .test.storeresult to store the posted back results in .test.bpbrestab +run,0,0,q,asyncutil.postback[h;({x+1};2);`.test.storeresult],1,,testing asyncutil.postback that should execute on both servers +run,0,0,q,@[;"";()] each h,1,,wait for server response +true,0,0,q,(enlist 3;enlist 3) ~ -2#exec result from .test.bpbrestab,1,,successful asyncutil.postback with expected result +run,0,0,q,asyncutil.postback[h;({x+`a};2);`.test.storeresult],1,,testing asyncutil.postback call that should fail on both servers +run,0,0,q,@[;"";()] each h,1,,wait for server response +true,0,0,q,all all (exec result from -2#.test.bpbrestab) like\: "error*",1,,asyncutil.postback call failed on both servers with expected result +run,0,0,q,asyncutil.postback[h;(`f;1);`.test.storeresult],1,,testing asyncutil.postback call that should fail on one server +run,0,0,q,@[;"";()] each h,1,,wait for server response +true,0,0,q,(enlist 2;enlist "error: server fail: f") ~ exec result from -2#.test.bpbrestab,1,,asyncutil.postback call failed on one server with expected result + +run,0,0,q,@[;"exit 0";()] each neg h,,1,,closing remaining server process diff --git a/main.zip b/main.zip new file mode 100644 index 00000000..dd741ee8 Binary files /dev/null and b/main.zip differ