diff --git a/CMakeLists.txt b/CMakeLists.txt index c3f0b0e..d2243ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,3 +104,11 @@ add_executable(PrecoderDemo txdemo/precoder_demo/main.cpp ) target_link_libraries(PrecoderDemo PUBLIC WiFiDriver PRIVATE PkgConfig::libusb) + +# Stream-link TX: reads length-prefixed PSDU bodies from stdin and transmits +# one probe-request per body. Driven by tools/precoder/stream_tx.py — the +# C++/Python split keeps libusb out of the framing math. +add_executable(StreamTxDemo + txdemo/stream_tx_demo/main.cpp +) +target_link_libraries(StreamTxDemo PUBLIC WiFiDriver PRIVATE PkgConfig::libusb) diff --git a/demo/main.cpp b/demo/main.cpp index 2e1a384..55a0767 100644 --- a/demo/main.cpp +++ b/demo/main.cpp @@ -65,6 +65,19 @@ static void packetProcessor(const Packet &packet) { * 6M OFDM and that its shaped PSDU bytes round-tripped intact — the * two-adapter, no-SDR verification. First few hits only. */ static const bool dump_body = std::getenv("DEVOURER_DUMP_BODY") != nullptr; + /* DEVOURER_STREAM_OUT=1: like DEVOURER_DUMP_BODY but uncapped — print + * every canonical-SA frame's body for the stream RX driver + * (tools/precoder/stream_rx.py) to decode. Tag is distinct so the + * regular dump_body capture stays uncluttered. */ + static const bool stream_out = std::getenv("DEVOURER_STREAM_OUT") != nullptr; + if (stream_out) { + printf("rate=%u len=%zu body=", + packet.RxAtrib.data_rate, packet.Data.size()); + for (size_t i = 24; i < packet.Data.size(); ++i) + printf("%02x", packet.Data[i]); + printf("\n"); + fflush(stdout); + } if (dump_body && hits <= 5) { /* Tier-2 health diagnostics alongside the byte mirror: rate (0x04 = * 6M OFDM), per-stream RSSI/EVM/SNR (link quality — content-blind), diff --git a/tests/precoder_stream_roundtrip.py b/tests/precoder_stream_roundtrip.py new file mode 100644 index 0000000..d061cf9 --- /dev/null +++ b/tests/precoder_stream_roundtrip.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +"""Two-adapter byte-stream round-trip on the precoder stream link. + +Streams `--bytes N` random bytes from one devourer adapter (StreamTxDemo, +fed by tools/precoder/stream_tx.py) to a second devourer adapter +(WiFiDriverDemo with DEVOURER_STREAM_OUT=1), then decodes the received + lines via stream.decode_body and checks: + + 1. TRANSPORT — at least `--min-frames` frames decoded. + 2. RX RATE — every decoded frame's RX rate index == DESC_RATE6M (0x04), + i.e. the chip really transmitted legacy 6M OFDM. + 3. BYTES — concatenating decoded payloads (by ascending seq, after + dedup) reproduces the source bytes exactly. + +With `--shape '0:+1,10:-1,...'` the same shape is used on both sides AND each +captured body is fed back through `emulate_chip` to confirm the encoded +subcarriers honour the pin pattern. That is a model-bound check (it proves +"the bytes we sent encode a body that, under our model, puts ±1 at the +pinned subcarriers"); proving the chip actually radiates those subcarriers +on-air still needs an SDR / BB-dbgport observer — see tools/precoder/README.md. + +Defaults match PrecoderDemo's matrix-validated cell: 2.4 GHz channel 6, +RTL8812AU TX and RTL8821AU / RTL8811AU RX. RTL8814AU is out of scope (TX +flakiness, issue #36). + + sudo python3 tests/precoder_stream_roundtrip.py \\ + --tx-pid 0x8812 --rx-pid 0x8813 --channel 6 --bytes 4096 + sudo python3 tests/precoder_stream_roundtrip.py \\ + --tx-pid 0x8812 --rx-pid 0x8813 --channel 6 --bytes 1024 \\ + --shape '0:+1,8:-1,16:+1,24:-1,32:+1' +""" + +from __future__ import annotations + +import argparse +import os +import random +import re +import subprocess +import sys +import threading +import time +from pathlib import Path + +HERE = Path(__file__).resolve().parent +REPO = HERE.parent +PRECODER = REPO / "tools" / "precoder" +sys.path.insert(0, str(PRECODER)) +import stream # noqa: E402 +import encode_subcarriers as enc # noqa: E402 + +DESC_RATE6M = 0x04 +_STREAM_RE = re.compile( + r"rate=(?P\d+) len=(?P\d+) body=(?P[0-9a-fA-F]*)" +) + + +def parse_shape(s: str | None): + if not s: + return None + out = {} + for tok in s.split(","): + k, v = tok.split(":") + sign = +1 if v.strip().lstrip("+") in ("1", "") else -1 + out[int(k.strip(), 0)] = sign + return out + + +class Reader(threading.Thread): + """Drain a subprocess' stdout into a line list.""" + + def __init__(self, proc: subprocess.Popen): + super().__init__(daemon=True) + self.proc = proc + self.lines: list[str] = [] + self._stop = False + + def run(self) -> None: + assert self.proc.stdout is not None + for line in self.proc.stdout: + self.lines.append(line) + if self._stop: + break + + def stop(self) -> None: + self._stop = True + + +def _check_shape_honoured(body: bytes, plen: int, shape: dict, *, seed: int, + offset: int, entry_state: int) -> tuple[bool, int]: + """Run emulate_chip over the encoded-symbol prefix of `body` and verify + pinned subcarriers match shape. + + `plen` is the framed payload length recovered from the decoded envelope; + it lets us compute exactly how many OFDM symbols the encoder used (the + received body trails extra chip bytes — 4 B FCS + RX trailer — that the + encoder didn't shape and would otherwise look like violations). + + Returns (ok, n_violations) where n_violations counts (symbol, + subcarrier) cells whose model output differs from the requested pin. + """ + import numpy as np + + phy = enc._LEGACY_BPSK + layout = stream.plan_body(plen, shape, phy=phy) + bytes_per_sym = phy.n_dbps // 8 + expected = layout.n_sym * bytes_per_sym + if len(body) < expected: + return False, -1 + psdu_bits = enc.bytes_to_bits(body[:expected]) + sub = enc.emulate_chip(psdu_bits, seed, phy, layout.n_sym, + offset=offset, entry_state=entry_state) + violations = 0 + for sc, want in shape.items(): + violations += int(np.sum(sub[:, sc] != want)) + return violations == 0, violations + + +def run_test(args) -> int: + rng = random.Random(args.data_seed) + data = bytes(rng.randint(0, 255) for _ in range(args.bytes)) + print(f"[data] {len(data)} bytes (seed=0x{args.data_seed:x})") + + workdir = Path(args.workdir) + workdir.mkdir(parents=True, exist_ok=True) + data_path = workdir / "tx_data.bin" + data_path.write_bytes(data) + + if args.dry_run: + print(f"[dry-run] would TX {args.tx_bin} via stream_tx.py " + f"({args.bytes}B, shape={args.shape!r}), RX {args.rx_bin} on " + f"ch{args.channel} for up to {args.duration}s") + return 0 + + # 1. RX up first. + rx_env = dict(os.environ, DEVOURER_PID=args.rx_pid, DEVOURER_VID=args.rx_vid, + DEVOURER_CHANNEL=str(args.channel), + DEVOURER_STREAM_OUT="1", DEVOURER_USB_QUIET="1") + print(f"[rx] launching {args.rx_bin} vid={args.rx_vid} pid={args.rx_pid}") + rx = subprocess.Popen([args.rx_bin], env=rx_env, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + text=True, bufsize=1) + rx_reader = Reader(rx) + rx_reader.start() + time.sleep(args.rx_warmup) + + # 2. TX side: stream_tx.py | StreamTxDemo, with shape/seed/offset/etc. + tx_env = dict(os.environ, DEVOURER_PID=args.tx_pid, DEVOURER_VID=args.tx_vid, + DEVOURER_CHANNEL=str(args.channel), DEVOURER_USB_QUIET="1") + pyenv = dict(os.environ) + if args.shape: + pyenv["DEVOURER_STREAM_SHAPE"] = args.shape + if args.seed is not None: + pyenv["DEVOURER_STREAM_SEED"] = hex(args.seed) + if args.offset is not None: + pyenv["DEVOURER_STREAM_OFFSET"] = str(args.offset) + if args.entry_state is not None: + pyenv["DEVOURER_STREAM_ENTRY_STATE"] = hex(args.entry_state) + + encoder_cmd = ["uv", "run", "python", "stream_tx.py", "--input", + str(data_path), "--repeat", str(args.repeat)] + print(f"[tx] {' '.join(encoder_cmd)} | {args.tx_bin}") + encoder = subprocess.Popen(encoder_cmd, cwd=str(PRECODER), env=pyenv, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + tx = subprocess.Popen([args.tx_bin, "--interval-ms", + str(args.interval_ms)], + env=tx_env, stdin=encoder.stdout, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if encoder.stdout is not None: + encoder.stdout.close() # let the encoder receive SIGPIPE if tx dies + + expected_frames = max(1, (args.bytes + 13) // 14) # mtu=14 default + + # 3. Collect until expected frames received or duration elapsed. + deadline = time.monotonic() + args.duration + try: + while time.monotonic() < deadline: + hits = sum(1 for l in rx_reader.lines if _STREAM_RE.search(l)) + if hits >= expected_frames + 2: # small buffer for retransmits + time.sleep(0.5) + break + time.sleep(0.5) + finally: + for p in (tx, encoder, rx): + try: + p.terminate() + except ProcessLookupError: + pass + rx_reader.stop() + for p in (tx, encoder, rx): + try: + p.wait(timeout=3) + except subprocess.TimeoutExpired: + p.kill() + + if args.keep: + (workdir / "rx.log").write_text("".join(rx_reader.lines)) + encoder_err = encoder.stderr.read() if encoder.stderr else b"" + (workdir / "encoder.log").write_bytes(encoder_err) + print(f"[keep] logs -> {workdir}") + + # 4. Verdict. + shape = parse_shape(args.shape) + seed = args.seed if args.seed is not None else stream.DEFAULT_SEED + offset = args.offset if args.offset is not None else 0 + entry_state = (args.entry_state if args.entry_state is not None else 0) + + by_seq: dict[int, bytes] = {} + rate_mismatch = 0 + malformed = 0 + shape_violations = 0 + shape_checked = 0 + bodies_for_shape: list[tuple[bytes, int]] = [] # (body, plen) + + for l in rx_reader.lines: + m = _STREAM_RE.search(l) + if not m: + continue + rate = int(m.group("rate")) + if rate != DESC_RATE6M: + rate_mismatch += 1 + body = bytes.fromhex(m.group("hex")) + frame = stream.decode_body(body, shape=shape, seed=seed, + offset=offset, entry_state=entry_state) + if frame is None: + malformed += 1 + continue + if frame.seq not in by_seq: + by_seq[frame.seq] = frame.payload + if shape: + bodies_for_shape.append((body, len(frame.payload))) + + print(f"\n--- stream round-trip ({args.tx_pid} TX → {args.rx_pid} RX) ---") + ok = True + + print(f"[1/3] transport: {len(by_seq)} unique frame(s) decoded, " + f"expected ~{expected_frames}", + "PASS" if len(by_seq) >= args.min_frames else "FAIL") + ok &= len(by_seq) >= args.min_frames + + print(f"[2/3] phy rate: {rate_mismatch} non-OFDM frame(s) out of " + f"{len(by_seq) + malformed + rate_mismatch} captured", + "PASS" if rate_mismatch == 0 else "FAIL") + ok &= rate_mismatch == 0 + + rx_data = b"".join(by_seq[s] for s in sorted(by_seq)) + common = min(len(rx_data), len(data)) + match = common > 0 and rx_data[:common] == data[:common] + if len(by_seq) >= expected_frames: + match = match and rx_data == data + print(f"[3/3] bytes: {common}/{len(data)} compared, " + + ("identical" if match else "DIVERGENT"), + "PASS" if match else "FAIL") + ok &= match + if not match and common: + diff = next((i for i in range(common) if rx_data[i] != data[i]), None) + print(f" first diff at byte {diff}: rx={rx_data[diff]:#04x} " + f"expected={data[diff]:#04x}") + + if shape: + for body, plen in bodies_for_shape: + shape_checked += 1 + okp, violations = _check_shape_honoured( + body, plen, shape, + seed=seed, offset=offset, entry_state=entry_state) + if not okp: + shape_violations += max(violations, 1) + print(f"[4/4] shape: {shape_checked} body/bodies model-checked, " + f"{shape_violations} subcarrier violation(s) " + + ("PASS" if shape_violations == 0 else "FAIL")) + ok &= shape_violations == 0 + print(" NB: model-bound check only — confirms encoded bytes " + "carry the shape; on-air shape needs SDR / BB-dbgport.") + + print("RESULT:", "PASS" if ok else "FAIL") + return 0 if ok or args.allow_fail else 1 + + +def main(argv: "list[str] | None" = None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + ap.add_argument("--tx-pid", required=True) + ap.add_argument("--rx-pid", required=True) + ap.add_argument("--tx-vid", default="0x0bda") + ap.add_argument("--rx-vid", default="0x0bda") + ap.add_argument("--channel", type=int, default=6) + ap.add_argument("--bytes", type=int, default=512, + help="number of stream bytes to send (default 512)") + ap.add_argument("--data-seed", type=lambda s: int(s, 0), default=0xBEEF) + ap.add_argument("--shape", default=None, + help="shape spec passed to both TX and RX, e.g. " + "'0:+1,10:-1,20:+1'. Adds a model-bound subcarrier " + "check on every captured body.") + ap.add_argument("--seed", type=lambda s: int(s, 0), default=None, + help="chip scrambler seed for the encoder/decoder model " + "(byte mode ignores this)") + ap.add_argument("--offset", type=int, default=None, + help="scrambler-phase offset of the body (default 0 — the " + "shape pins are honoured by the model but not " + "on-air; pass 208 to match PrecoderDemo placement)") + ap.add_argument("--entry-state", type=lambda s: int(s, 0), default=None) + ap.add_argument("--tx-bin", default=str(REPO / "build" / "StreamTxDemo")) + ap.add_argument("--rx-bin", default=str(REPO / "build" / "WiFiDriverDemo")) + ap.add_argument("--interval-ms", type=int, default=2) + ap.add_argument("--repeat", type=int, default=4, + help="TX-side per-frame replication to combat early-frame " + "loss during the RX warmup (default 4)") + ap.add_argument("--duration", type=float, default=30.0) + ap.add_argument("--rx-warmup", type=float, default=12.0, + help="seconds to let the RX radio come up before TX. Some " + "chips need ~15s; raise if transport reports 0") + ap.add_argument("--min-frames", type=int, default=1, + help="minimum decoded frames to call the transport check " + "a PASS (default 1)") + ap.add_argument("--workdir", default="/tmp/precoder-stream-roundtrip") + ap.add_argument("--keep", action="store_true") + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--allow-fail", action="store_true") + args = ap.parse_args(argv) + return run_test(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/precoder_stream_smoke.py b/tests/precoder_stream_smoke.py new file mode 100644 index 0000000..b0ef3b2 --- /dev/null +++ b/tests/precoder_stream_smoke.py @@ -0,0 +1,109 @@ +"""Repo-level smoke for the stream-link layer (tools/precoder/stream.py). + +Pairs with the exhaustive in-subtree tests in tools/precoder/test_stream.py +(36 cases, requires the uv env). This file is what CI / a casual contributor +runs without the uv toolchain — it skips cleanly when numpy isn't installed +on the system Python. + +It drives the same encode → simulated-on-wire → decode path the two-adapter +hardware harness uses, but synthesises the `` line that +WiFiDriverDemo would print, so no USB is involved. + +The smoke covers: + * byte-mode round-trip — frame the input, encode, decode, reassemble; + * shape-mode round-trip — same, plus verifying each encoded body's pinned + subcarriers match the requested ±1 under `emulate_chip` (model-bound; + on-air verification still needs SDR / BB-dbgport). +""" + +from __future__ import annotations + +import struct +import sys +from pathlib import Path + +import pytest + +np = pytest.importorskip("numpy") + +PRECODER = Path(__file__).resolve().parent.parent / "tools" / "precoder" +sys.path.insert(0, str(PRECODER)) +import encode_subcarriers as enc # noqa: E402 +import stream # noqa: E402 + + +def _frame_psdu_for_wire(body: bytes) -> bytes: + """Round-trip the StreamTxDemo wire format: length-prefixed PSDU body.""" + return struct.pack(" list[bytes]: + out: list[bytes] = [] + i = 0 + while i < len(buf): + (n,) = struct.unpack(" dict: + return {0: +1, 8: -1, 16: +1, 24: -1, 32: +1} + + +def test_byte_mode_repo_smoke(): + data = bytes(range(140)) # 10 frames at the default mtu=14 + frames = stream.pack_stream(data) + bodies = [] + for f in frames: + body, _ = stream.encode_body(f) + bodies.append(body) + + # Simulate the wire: length-prefixed PSDU bodies in, the same bodies + # back out (StreamTxDemo emits them, demo's DEVOURER_STREAM_OUT dumps + # them — identity on the byte plane). + wire = b"".join(_frame_psdu_for_wire(b) for b in bodies) + rx_bodies = _parse_length_prefixed(wire) + assert rx_bodies == bodies + + decoded = [stream.decode_body(b) for b in rx_bodies] + assert all(d is not None for d in decoded) + by_seq = {d.seq: d.payload for d in decoded} + assert b"".join(by_seq[s] for s in sorted(by_seq)) == data + + +def test_shape_mode_repo_smoke(): + data = bytes(range(80)) + shape = _build_shape_dict() + frames = stream.pack_stream(data) + bodies_layouts = [stream.encode_body(f, shape=shape) for f in frames] + + # Per-body shape check: emulate_chip(encoded_body) must put the + # requested ±1 at every pinned subcarrier of every body symbol. + for body, layout in bodies_layouts: + psdu_bits = enc.bytes_to_bits(body)[: layout.n_sym * stream._LEGACY_BPSK.n_dbps] + sub = enc.emulate_chip(psdu_bits, stream.DEFAULT_SEED, stream._LEGACY_BPSK, + n_sym=layout.n_sym, offset=0, entry_state=0) + for sc, want in shape.items(): + assert np.all(sub[:, sc] == want), \ + f"shape violation at subcarrier {sc}" + + # Wire-roundtrip the bodies and re-decode with the same shape. + wire = b"".join(_frame_psdu_for_wire(b) for b, _ in bodies_layouts) + rx_bodies = _parse_length_prefixed(wire) + decoded = [stream.decode_body(b, shape=shape) for b in rx_bodies] + assert all(d is not None for d in decoded) + by_seq = {d.seq: d.payload for d in decoded} + assert b"".join(by_seq[s] for s in sorted(by_seq)) == data + + +def test_shape_mode_grows_body_when_pin_rank_rises(): + """As more subcarriers are pinned, per-symbol capacity drops and the body + must grow to fit the envelope at the reduced rate.""" + f = stream.StreamFrame(seq=0, total=1, payload=b"x" * 14) # max byte-mode payload + body0, layout0 = stream.encode_body(f) + assert layout0.body_bytes == stream.DEFAULT_BODY_BYTES + body1, layout1 = stream.encode_body(f, shape={0: +1, 8: -1, 16: +1, 24: -1, 32: +1}) + assert layout1.body_bytes > layout0.body_bytes + assert layout1.capacity_per_sym < layout0.capacity_per_sym diff --git a/tools/precoder/pyproject.toml b/tools/precoder/pyproject.toml index 7f6bb9b..2596fe7 100644 --- a/tools/precoder/pyproject.toml +++ b/tools/precoder/pyproject.toml @@ -24,4 +24,4 @@ dev = [ [tool.pytest.ini_options] # test_pipeline.py lives alongside the module; default (prepend) import mode # puts this dir on sys.path so `import encode_subcarriers` resolves. -testpaths = ["test_pipeline.py"] +testpaths = ["test_pipeline.py", "test_stream.py"] diff --git a/tools/precoder/stream.py b/tools/precoder/stream.py new file mode 100644 index 0000000..3b3e995 --- /dev/null +++ b/tools/precoder/stream.py @@ -0,0 +1,579 @@ +"""Stream framing on top of the precoder. + +Wraps PSDU bodies in a tiny seq/length/CRC envelope so a TX→RX pair can carry +a sequenced byte stream over the legacy-6M-OFDM probe-request link from +PrecoderDemo. Optionally also encodes a per-OFDM-symbol shape constraint +(pinned per-subcarrier ±1 values) on every body symbol — the remaining +null-space dimensions of the BCC+interleaver linear map carry the framing +bits, so one frame demonstrates frequency-domain control AND sequenced data +delivery in the same packet. + +Frame layout (body bytes after the MAC header, before the chip's scrambler): + + MAGIC (2) = 0xD5DE little-endian + SEQ (2) = little-endian wrapping counter + TOTAL (2) = total frames in this message (0 = unbounded stream) + PLEN (2) = payload length in bytes + PAYLOAD (PLEN bytes) + PAD (body - 10 - PLEN bytes; byte-mode only, zeros) + CRC16 (2) = CRC-16-CCITT(0xFFFF init) over MAGIC..PAD, little-endian + +Envelope overhead is 10 bytes. A 24-byte body therefore carries up to 14 bytes +of payload when no shape is requested; with shape, the body grows to fit (see +`plan_body` / `encode_body` for the math). + +CONVENTION — the bytes returned/consumed here are the DESCRAMBLED body bytes. +They go directly into `send_packet`'s 802.11 body (the chip applies its own +scrambler before BCC) and they come directly out of devourer's +`DEVOURER_DUMP_BODY` (the chip has already descrambled them on RX). The chip +scrambler is therefore invisible to the byte-stream caller; only the shape +codec needs to know the scrambler seed/offset to invert the per-subcarrier +constraint mapping. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, Iterator, Optional, Union + +import numpy as np + +from encode_subcarriers import ( + DEFAULT_SEED, + PhyParams, + _LEGACY_BPSK, + bcc_encode, + bcc_final_state, + bits_to_bytes, + bpsk_demap, + bytes_to_bits, + interleave, + scrambler_sequence, +) + + +MAGIC = 0xD5DE +HEADER_LEN = 8 # MAGIC + SEQ + TOTAL + PLEN +TRAILER_LEN = 2 # CRC16 +ENVELOPE_LEN = HEADER_LEN + TRAILER_LEN # 10 bytes overhead + +# Default body size matches PrecoderDemo's 24-byte probe-request body. +DEFAULT_BODY_BYTES = 24 + + +# --------------------------------------------------------------------------- # +# CRC-16-CCITT-FALSE (poly 0x1021, init 0xFFFF, no reflection, no xorout) +# --------------------------------------------------------------------------- # +def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int: + crc = init & 0xFFFF + for byte in data: + crc ^= (byte & 0xFF) << 8 + for _ in range(8): + if crc & 0x8000: + crc = ((crc << 1) ^ 0x1021) & 0xFFFF + else: + crc = (crc << 1) & 0xFFFF + return crc + + +# --------------------------------------------------------------------------- # +# Generator matrix for one OFDM symbol +# --------------------------------------------------------------------------- # +def compute_generator(phy: PhyParams = _LEGACY_BPSK) -> np.ndarray: + """M ∈ GF(2)^{n_cbps × n_dbps} so sub = M @ scrambled_info ⊕ b(entry_state). + + The BCC+interleaver pipeline is GF(2)-linear in the per-symbol scrambled + input bits, with the 6-bit BCC entry_state contributing an affine offset + (see `compute_state_offset`). M is built by running the forward pipeline + on each unit input vector from a zero entry state. + """ + M = np.zeros((phy.n_cbps, phy.n_dbps), dtype=np.uint8) + for k in range(phy.n_dbps): + unit = np.zeros(phy.n_dbps, dtype=np.uint8) + unit[k] = 1 + coded = bcc_encode(unit, init_state=0) + sub = interleave(coded, phy) + M[:, k] = sub + return M + + +def compute_state_offset(entry_state: int, + phy: PhyParams = _LEGACY_BPSK) -> np.ndarray: + """b(entry_state) = pipeline(all-zero scrambled-input, entry_state). + + Encodes the affine constant in sub = M @ scrambled_info ⊕ b(entry_state). + """ + coded = bcc_encode(np.zeros(phy.n_dbps, dtype=np.uint8), + init_state=entry_state) + return interleave(coded, phy) + + +# --------------------------------------------------------------------------- # +# GF(2) linear algebra +# --------------------------------------------------------------------------- # +@dataclass(frozen=True) +class GF2Solution: + consistent: bool + particular: np.ndarray # (n,) + null_basis: np.ndarray # (n, k); each column is a kernel vector + free_cols: np.ndarray # (k,); column indices the free vars live at + + @property + def n_free(self) -> int: + return int(self.free_cols.size) + + +def gf2_solve(A: np.ndarray, b: np.ndarray) -> GF2Solution: + """Solve A x = b over GF(2) by RREF. + + `null_basis` columns are constructed so column j has exactly one `1` at + row `free_cols[j]` and zero at every other row in `free_cols`; that + convention makes the encoder/decoder symmetric — encode sets `x[free_cols[j]] + = data[j]`, decode reads `data[j] = x[free_cols[j]] ⊕ particular[free_cols[j]]`. + + When `A` has zero rows (no constraints) the identity basis on n variables + is returned, with `free_cols = arange(n)`. + """ + A = np.asarray(A, dtype=np.uint8).copy() + b = np.asarray(b, dtype=np.uint8).copy() + m, n = A.shape + if m != b.shape[0]: + raise ValueError(f"A rows ({m}) != b length ({b.shape[0]})") + if m == 0: + return GF2Solution( + consistent=True, + particular=np.zeros(n, dtype=np.uint8), + null_basis=np.eye(n, dtype=np.uint8), + free_cols=np.arange(n, dtype=np.int64), + ) + aug = np.column_stack([A, b]) + pivot_cols: list[int] = [] + pivot_rows: list[int] = [] + row = 0 + for col in range(n): + pivot = -1 + for r in range(row, m): + if aug[r, col]: + pivot = r + break + if pivot < 0: + continue + if pivot != row: + aug[[row, pivot]] = aug[[pivot, row]] + for r in range(m): + if r != row and aug[r, col]: + aug[r] ^= aug[row] + pivot_cols.append(col) + pivot_rows.append(row) + row += 1 + if row == m: + break + for r in range(row, m): + if aug[r, n]: + return GF2Solution( + consistent=False, + particular=np.zeros(n, dtype=np.uint8), + null_basis=np.zeros((n, 0), dtype=np.uint8), + free_cols=np.empty(0, dtype=np.int64), + ) + particular = np.zeros(n, dtype=np.uint8) + for pr, pc in zip(pivot_rows, pivot_cols): + particular[pc] = aug[pr, n] + pivot_set = set(pivot_cols) + free_cols = np.array([c for c in range(n) if c not in pivot_set], + dtype=np.int64) + null = np.zeros((n, free_cols.size), dtype=np.uint8) + for j, fc in enumerate(free_cols): + null[fc, j] = 1 + for pr, pc in zip(pivot_rows, pivot_cols): + null[pc, j] = aug[pr, fc] + return GF2Solution( + consistent=True, + particular=particular, + null_basis=null, + free_cols=free_cols, + ) + + +def gf2_rank(A: np.ndarray) -> int: + if A.shape[0] == 0: + return 0 + sol = gf2_solve(A, np.zeros(A.shape[0], dtype=np.uint8)) + return A.shape[1] - sol.n_free + + +# --------------------------------------------------------------------------- # +# Shape spec +# --------------------------------------------------------------------------- # +ShapeSpec = Union[np.ndarray, dict, None] + + +def normalise_shape(shape: ShapeSpec, n_sd: int) -> tuple[np.ndarray, np.ndarray]: + """Normalise a shape spec to (pinned_subcarrier_indices, pinned_bits). + + Accepts either a dict {sc_idx: ±1} or an array of length n_sd with 0 = + don't care, ±1 = pinned. Returns the sorted index array and the matching + uint8 bit values (0 for +1, 1 for −1). + """ + if shape is None: + return np.empty(0, dtype=np.int64), np.empty(0, dtype=np.uint8) + if isinstance(shape, dict): + items = sorted(shape.items()) + idx = np.array([k for k, _ in items], dtype=np.int64) + vals = np.array([v for _, v in items], dtype=np.int8) + else: + arr = np.asarray(shape, dtype=np.int8) + if arr.shape != (n_sd,): + raise ValueError(f"shape array must have length {n_sd}, got {arr.shape}") + idx = np.nonzero(arr != 0)[0].astype(np.int64) + vals = arr[idx] + if idx.size: + if idx.min() < 0 or idx.max() >= n_sd: + raise ValueError(f"shape index out of range [0, {n_sd})") + if len(set(idx.tolist())) != len(idx): + raise ValueError("shape has duplicate subcarrier indices") + if not np.all(np.isin(vals, [1, -1])): + raise ValueError("shape values must be +1 or -1") + return idx, bpsk_demap(vals).astype(np.uint8) + + +# --------------------------------------------------------------------------- # +# Stream frame envelope +# --------------------------------------------------------------------------- # +@dataclass(frozen=True) +class StreamFrame: + seq: int + total: int + payload: bytes + + def envelope_bytes(self, body_bytes: int) -> bytes: + """Build the (descrambled) envelope image at exactly `body_bytes` bytes. + + Layout: HEADER(8) | PAYLOAD(plen) | CRC(2) | PAD(body - 10 - plen). + CRC sits immediately after the payload — at the PLEN-determined offset + — so a parser can locate it from PLEN alone, ignoring any trailing + bytes the chip appends (FCS, RX trailer) or the encoder adds for + body-size alignment. + """ + max_payload = body_bytes - ENVELOPE_LEN + if len(self.payload) > max_payload: + raise ValueError( + f"payload {len(self.payload)}B exceeds capacity " + f"{max_payload}B (body={body_bytes}B)" + ) + out = bytearray() + out += MAGIC.to_bytes(2, "little") + out += (self.seq & 0xFFFF).to_bytes(2, "little") + out += (self.total & 0xFFFF).to_bytes(2, "little") + out += len(self.payload).to_bytes(2, "little") + out += bytes(self.payload) + crc = crc16_ccitt(bytes(out)) + out += crc.to_bytes(2, "little") + pad = body_bytes - len(out) + if pad > 0: + out += bytes(pad) + return bytes(out) + + +def parse_envelope(body: bytes) -> Optional[StreamFrame]: + """Inverse of `StreamFrame.envelope_bytes`. Returns None on a magic / + CRC / PLEN mismatch (treated as "not one of our frames"). + + PLEN is authoritative for envelope size, so trailing bytes added by the + chip (typically the 4-byte 802.11 FCS plus any RX trailer that survives + `demo/main.cpp`'s body dump) are ignored. The CRC is read from + `HEADER_LEN + plen .. HEADER_LEN + plen + 2`, NOT from `body[-2:]`. + """ + if len(body) < ENVELOPE_LEN: + return None + if int.from_bytes(body[0:2], "little") != MAGIC: + return None + plen = int.from_bytes(body[6:8], "little") + envelope_size = HEADER_LEN + plen + TRAILER_LEN + if envelope_size > len(body): + return None + expected = int.from_bytes( + body[envelope_size - TRAILER_LEN:envelope_size], "little") + actual = crc16_ccitt(bytes(body[:envelope_size - TRAILER_LEN])) + if expected != actual: + return None + seq = int.from_bytes(body[2:4], "little") + total = int.from_bytes(body[4:6], "little") + payload = bytes(body[HEADER_LEN:HEADER_LEN + plen]) + return StreamFrame(seq=seq, total=total, payload=payload) + + +# --------------------------------------------------------------------------- # +# Body planning +# --------------------------------------------------------------------------- # +@dataclass(frozen=True) +class BodyLayout: + """Resolved geometry for one shaped or byte-mode body. + + `n_sym` is the count of OFDM data symbols the body spans; `body_bytes` is + `n_sym × n_dbps / 8` (legacy: 3 bytes/symbol). `capacity_per_sym` is the + data-bit budget per OFDM symbol after the shape constraint (= n_dbps when + no shape, = n_dbps - rank(M[S,:]) otherwise). + """ + body_bytes: int + n_sym: int + capacity_per_sym: int + payload_capacity: int + + +def plan_body(payload_len: int, + shape: ShapeSpec = None, + *, + body_bytes: Optional[int] = None, + phy: PhyParams = _LEGACY_BPSK) -> BodyLayout: + """Resolve n_sym, body_bytes, and per-symbol capacity for a given payload + size and shape spec. + + Without shape: body is fixed at `body_bytes` (default 24); payload must + fit in `body_bytes - ENVELOPE_LEN`. With shape: capacity per symbol drops + to `n_dbps - rank(M[S,:])` and `n_sym` grows to fit the envelope at this + reduced rate; `body_bytes`, when passed, acts as a floor. + """ + if phy.n_dbps % 8 != 0: + raise NotImplementedError( + f"this PoC assumes n_dbps ({phy.n_dbps}) is a multiple of 8 " + "so one OFDM symbol = whole bytes; legacy 6M (24) satisfies this" + ) + bytes_per_sym = phy.n_dbps // 8 + + pin_idx, _ = normalise_shape(shape, phy.n_sd) + if pin_idx.size == 0: + if body_bytes is None: + body_bytes = DEFAULT_BODY_BYTES + if body_bytes % bytes_per_sym != 0: + raise ValueError( + f"body_bytes ({body_bytes}) must be a multiple of " + f"{bytes_per_sym} (one OFDM symbol)" + ) + if payload_len > body_bytes - ENVELOPE_LEN: + raise ValueError( + f"payload {payload_len}B exceeds {body_bytes - ENVELOPE_LEN}B " + f"(body={body_bytes}B, envelope={ENVELOPE_LEN}B)" + ) + return BodyLayout( + body_bytes=body_bytes, + n_sym=body_bytes // bytes_per_sym, + capacity_per_sym=phy.n_dbps, + payload_capacity=body_bytes - ENVELOPE_LEN, + ) + + M = compute_generator(phy) + rank = gf2_rank(M[pin_idx, :]) + capacity = phy.n_dbps - rank + if capacity == 0: + raise ValueError( + "shape pins all 24 degrees of freedom (rank=24); no room for " + "framing bits — drop a pin or switch to byte mode" + ) + envelope_bits = (ENVELOPE_LEN + payload_len) * 8 + needed_sym = (envelope_bits + capacity - 1) // capacity + floor_sym = (body_bytes or 0) // bytes_per_sym + n_sym = max(needed_sym, floor_sym) + return BodyLayout( + body_bytes=n_sym * bytes_per_sym, + n_sym=n_sym, + capacity_per_sym=capacity, + payload_capacity=(n_sym * capacity) // 8 - ENVELOPE_LEN, + ) + + +# --------------------------------------------------------------------------- # +# Body encode / decode +# --------------------------------------------------------------------------- # +def encode_body( + frame: StreamFrame, + shape: ShapeSpec = None, + *, + body_bytes: Optional[int] = None, + phy: PhyParams = _LEGACY_BPSK, + seed: int = DEFAULT_SEED, + offset: int = 0, + entry_state: int = 0, +) -> tuple[bytes, BodyLayout]: + """Build the descrambled body bytes for one stream frame. + + Returns (body, layout). Byte mode (`shape=None`): returns the envelope + sized to `layout.body_bytes`. Shape mode: each symbol's info bits are + chosen so that the chip's on-air subcarriers honour the pin pattern AND + the symbol's free dimensions carry the envelope bits. + + `offset` and `entry_state` describe the start of the body within a real + PSDU's scrambler/BCC stream (README's offset=208 / entry_state derived + from SERVICE+MAC-header example). Byte mode ignores them. + """ + layout = plan_body(len(frame.payload), shape, body_bytes=body_bytes, phy=phy) + + if shape is None: + return frame.envelope_bytes(layout.body_bytes), layout + + # Shape mode: the envelope is byte-tight (10 + payload bytes) and its bits + # are bit-packed at `capacity_per_sym` bits/symbol across `n_sym` symbols. + envelope = frame.envelope_bytes(ENVELOPE_LEN + len(frame.payload)) + envelope_bits = bytes_to_bits(envelope) + slot_bits = layout.n_sym * layout.capacity_per_sym + assert envelope_bits.size <= slot_bits, "plan_body undersized the shaped body" + packed = np.zeros(slot_bits, dtype=np.uint8) + packed[:envelope_bits.size] = envelope_bits + + pin_idx, pin_vals = normalise_shape(shape, phy.n_sd) + M = compute_generator(phy) + M_S = M[pin_idx, :] + + body_info_bits = np.empty(layout.n_sym * phy.n_dbps, dtype=np.uint8) + state = entry_state & 0x3F + cur_offset = offset + for i in range(layout.n_sym): + b_state = compute_state_offset(state, phy) + scr_chunk = scrambler_sequence(seed, cur_offset + phy.n_dbps)[cur_offset:] + rhs = pin_vals.copy() + rhs ^= (M_S @ scr_chunk) & 1 + rhs ^= b_state[pin_idx] + sol = gf2_solve(M_S, rhs) + if not sol.consistent: + raise ValueError( + f"shape pin values are inconsistent at symbol {i} " + f"(state={state:#x}, offset={cur_offset}); the pinned " + "subcarriers are linearly dependent with conflicting " + "targets — pick a different ±1 pattern on the pinned set" + ) + data_slice = packed[i * layout.capacity_per_sym: + (i + 1) * layout.capacity_per_sym] + info_i = (sol.particular ^ ((sol.null_basis @ data_slice) & 1)).astype(np.uint8) + body_info_bits[i * phy.n_dbps:(i + 1) * phy.n_dbps] = info_i + scrambled = info_i ^ scr_chunk + state = bcc_final_state(scrambled, init_state=state) + cur_offset += phy.n_dbps + + return bits_to_bytes(body_info_bits), layout + + +def decode_body( + body: bytes, + shape: ShapeSpec = None, + *, + phy: PhyParams = _LEGACY_BPSK, + seed: int = DEFAULT_SEED, + offset: int = 0, + entry_state: int = 0, +) -> Optional[StreamFrame]: + """Recover a StreamFrame from a received descrambled body. + + Byte mode: the body IS the envelope (passthrough to parse_envelope). + Shape mode: per-symbol info bits are projected onto the null basis to + recover the embedded envelope bits. Returns None on magic/CRC/PLEN + failure (treated as "not one of our frames"). + """ + if shape is None: + return parse_envelope(bytes(body)) + + pin_idx, pin_vals = normalise_shape(shape, phy.n_sd) + M = compute_generator(phy) + M_S = M[pin_idx, :] + rank = gf2_rank(M_S) + capacity = phy.n_dbps - rank + if capacity == 0: + return None + + bytes_per_sym = phy.n_dbps // 8 + if len(body) < bytes_per_sym: + return None + n_sym_avail = len(body) // bytes_per_sym + info_bits = bytes_to_bits(bytes(body)) + + # Decode just enough symbols to cover the envelope; trailer / FCS / + # chip-padding past that isn't shape-encoded and would trip the + # null-space check below. PLEN is known only after the first + # ceil(HEADER_LEN*8 / capacity) symbols, so the limit grows mid-loop. + n_sym_header = (HEADER_LEN * 8 + capacity - 1) // capacity + if n_sym_avail < n_sym_header: + return None + + recovered = np.zeros(n_sym_avail * capacity, dtype=np.uint8) + n_sym_needed = n_sym_header + state = entry_state & 0x3F + cur_offset = offset + decoded = 0 + for i in range(n_sym_avail): + b_state = compute_state_offset(state, phy) + scr_chunk = scrambler_sequence(seed, cur_offset + phy.n_dbps)[cur_offset:] + info_i = info_bits[i * phy.n_dbps:(i + 1) * phy.n_dbps].copy() + rhs = pin_vals.copy() + rhs ^= (M_S @ scr_chunk) & 1 + rhs ^= b_state[pin_idx] + sol = gf2_solve(M_S, rhs) + if not sol.consistent: + return None + delta = (info_i ^ sol.particular) & 1 + data_slice = delta[sol.free_cols].astype(np.uint8) + if not np.array_equal((sol.null_basis @ data_slice) & 1, delta): + return None + recovered[i * capacity:(i + 1) * capacity] = data_slice + scrambled = info_i ^ scr_chunk + state = bcc_final_state(scrambled, init_state=state) + cur_offset += phy.n_dbps + decoded = i + 1 + + if decoded == n_sym_header: + header_bytes_dec = bits_to_bytes(recovered[:HEADER_LEN * 8]) + if int.from_bytes(header_bytes_dec[0:2], "little") != MAGIC: + return None + plen = int.from_bytes(header_bytes_dec[6:8], "little") + envelope_size = ENVELOPE_LEN + plen + n_sym_needed = (envelope_size * 8 + capacity - 1) // capacity + if n_sym_needed > n_sym_avail: + return None + if decoded >= n_sym_needed: + break + + if decoded < n_sym_needed: + return None + header_bytes = bits_to_bytes(recovered[:HEADER_LEN * 8]) + plen = int.from_bytes(header_bytes[6:8], "little") + envelope = bits_to_bytes(recovered[:(ENVELOPE_LEN + plen) * 8]) + return parse_envelope(envelope) + + +# --------------------------------------------------------------------------- # +# Stream helpers — chunk a byte buffer into frames and back +# --------------------------------------------------------------------------- # +def pack_stream(data: bytes, + mtu: int = DEFAULT_BODY_BYTES - ENVELOPE_LEN, + *, + seq_start: int = 0) -> list[StreamFrame]: + """Chunk `data` into StreamFrames of payload size <= `mtu`.""" + if mtu <= 0: + raise ValueError("mtu must be positive") + if not data: + return [] + chunks = [data[i:i + mtu] for i in range(0, len(data), mtu)] + total = len(chunks) + return [ + StreamFrame(seq=(seq_start + i) & 0xFFFF, total=total, payload=chunks[i]) + for i in range(total) + ] + + +def unpack_stream(frames: Iterable[StreamFrame]) -> Iterator[tuple[int, bytes]]: + """Yield `(seq, payload)` for each frame in arrival order; drop duplicates. + + Gaps are NOT filled — caller's responsibility to detect via the yielded + `seq`. Stops after `total` distinct frames have arrived (when total > 0). + """ + seen: set[int] = set() + total_target = 0 + delivered = 0 + for f in frames: + if f.seq in seen: + continue + seen.add(f.seq) + if f.total and not total_target: + total_target = f.total + yield f.seq, f.payload + delivered += 1 + if total_target and delivered >= total_target: + return diff --git a/tools/precoder/stream_rx.py b/tools/precoder/stream_rx.py new file mode 100644 index 0000000..0b977ff --- /dev/null +++ b/tools/precoder/stream_rx.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +"""Stream RX driver — decodes stream frames out of WiFiDriverDemo's stdout. + +Reads WiFiDriverDemo's stdout on this process's stdin, picks up the +`` lines emitted when DEVOURER_STREAM_OUT=1 is set, decodes +each body via `stream.decode_body` (optionally with a shape constraint), +re-orders by sequence number, and writes the decoded payload bytes to stdout. +Gaps and duplicates are logged to stderr. + +Usage: + DEVOURER_PID=0x8813 DEVOURER_CHANNEL=6 DEVOURER_STREAM_OUT=1 \ + ./build/WiFiDriverDemo | \ + DEVOURER_STREAM_SHAPE='0:+1,10:-1' \ + python3 tools/precoder/stream_rx.py > received.bin + +The env knob set is symmetric with `stream_tx.py` (same shape spec, +DEVOURER_STREAM_SEED / OFFSET / ENTRY_STATE). With no shape set, the RX side +just parses the byte-mode envelope — it never decodes a shape its TX peer +didn't use. + +Modes: + --collect (default): buffer all decoded frames until EOF / --total / a + gap-timeout, then emit payloads in sequence order. Bounded latency, + correct ordering, lossless when no gaps. + --realtime: emit each newly-decoded payload as it arrives; out-of-order + arrivals appear out of order. Useful for live demos. +""" + +from __future__ import annotations + +import argparse +import os +import re +import sys +import time +from typing import Optional + +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +import stream # noqa: E402 + +# Mirrors stream_tx.py's parser without re-importing it (kept side-effect free). +_STREAM_RE = re.compile( + r"rate=(?P\d+) len=(?P\d+) body=(?P[0-9a-fA-F]*)" +) + + +def parse_shape_env(s: str) -> Optional[dict]: + if not s: + return None + out: dict[int, int] = {} + for tok in s.split(","): + tok = tok.strip() + if not tok: + continue + if ":" not in tok: + raise ValueError(f"shape token {tok!r} missing ':' (want idx:±1)") + k, v = tok.split(":", 1) + k = int(k.strip(), 0) + v = v.strip() + if v in ("+1", "+", "1"): + sign = +1 + elif v in ("-1", "-"): + sign = -1 + else: + raise ValueError(f"shape value {v!r} (token {tok!r}); want ±1") + if k in out: + raise ValueError(f"duplicate subcarrier index {k} in shape") + out[k] = sign + return out or None + + +def _env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None or raw == "": + return default + return int(raw, 0) + + +def main(argv: Optional[list[str]] = None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + ap.add_argument("--shape", default=None, + help="override DEVOURER_STREAM_SHAPE") + ap.add_argument("--seed", type=lambda s: int(s, 0), default=None, + help="override DEVOURER_STREAM_SEED") + ap.add_argument("--offset", type=int, default=None, + help="override DEVOURER_STREAM_OFFSET") + ap.add_argument("--entry-state", type=lambda s: int(s, 0), default=None, + help="override DEVOURER_STREAM_ENTRY_STATE") + ap.add_argument("--realtime", action="store_true", + help="emit decoded payloads as they arrive (no reorder)") + ap.add_argument("--total", type=int, default=0, + help="stop after N distinct frames (default: rely on the " + "first frame's TOTAL field, fall back to EOF)") + ap.add_argument("--idle-timeout", type=float, default=0.0, + help="exit when no new frame for this many seconds " + "(default 0 = no timeout)") + args = ap.parse_args(argv) + + shape_raw = args.shape if args.shape is not None else os.environ.get( + "DEVOURER_STREAM_SHAPE", "") + shape = parse_shape_env(shape_raw) + seed = args.seed if args.seed is not None else _env_int( + "DEVOURER_STREAM_SEED", stream.DEFAULT_SEED) + offset = args.offset if args.offset is not None else _env_int( + "DEVOURER_STREAM_OFFSET", 0) + entry_state = args.entry_state if args.entry_state is not None else _env_int( + "DEVOURER_STREAM_ENTRY_STATE", 0) + + by_seq: dict[int, bytes] = {} + total_target = args.total + rx_total = 0 + dup_count = 0 + bad_count = 0 + rate_mismatch = 0 + last_event = time.monotonic() + + out_bytes = sys.stdout.buffer + + for line in sys.stdin: + m = _STREAM_RE.search(line) + if not m: + continue + rate = int(m.group("rate")) + if rate != 0x04: + # Tier-1 sanity: every shaped frame must fly as legacy 6M OFDM. A + # 0x00 here means the chip downgraded us to 1M CCK and OFDM + # subcarriers don't exist at all (see precoder README). + rate_mismatch += 1 + body = bytes.fromhex(m.group("hex")) + last_event = time.monotonic() + + frame = stream.decode_body( + body, shape=shape, seed=seed, offset=offset, entry_state=entry_state, + ) + if frame is None: + bad_count += 1 + continue + if frame.seq in by_seq: + dup_count += 1 + continue + by_seq[frame.seq] = frame.payload + rx_total += 1 + if total_target == 0 and frame.total: + total_target = frame.total + if args.realtime: + out_bytes.write(frame.payload) + out_bytes.flush() + + if total_target and rx_total >= total_target: + break + if args.idle_timeout and (time.monotonic() - last_event) > args.idle_timeout: + break + + if not args.realtime: + # Emit in ascending-seq order. Gaps don't stop us — we write what we + # have and report the gap on stderr; caller decides whether to retry. + for seq in sorted(by_seq): + out_bytes.write(by_seq[seq]) + out_bytes.flush() + + # Report. + seen = sorted(by_seq) + gaps: list[tuple[int, int]] = [] + if seen: + for i in range(seen[0], seen[-1] + 1): + if i not in by_seq: + if gaps and gaps[-1][1] == i - 1: + gaps[-1] = (gaps[-1][0], i) + else: + gaps.append((i, i)) + sys.stderr.write( + f"stream_rx: rx={rx_total} unique frame(s), " + f"dup={dup_count}, malformed={bad_count}, " + f"rate-mismatch={rate_mismatch}" + ) + if total_target: + sys.stderr.write(f", target={total_target}") + if gaps: + sys.stderr.write( + ", gaps=" + ",".join( + f"{a}" if a == b else f"{a}-{b}" for a, b in gaps[:8] + ) + + ("…" if len(gaps) > 8 else "") + ) + sys.stderr.write("\n") + return 0 if (rx_total > 0 and not gaps and rate_mismatch == 0) else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/precoder/stream_tx.py b/tools/precoder/stream_tx.py new file mode 100644 index 0000000..5aefce8 --- /dev/null +++ b/tools/precoder/stream_tx.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +"""Stream TX driver — wraps a byte stream into precoded PSDU bodies. + +Reads bytes from stdin (or --input), chunks them via `stream.pack_stream`, +encodes each frame via `stream.encode_body` (optionally with a shape +constraint sourced from the environment), and writes a sequence of +length-prefixed PSDU bodies to stdout. Piped into the C++ StreamTxDemo: + + python3 tools/precoder/stream_tx.py < data.bin | ./build/StreamTxDemo + +Wire format on stdout: + +Environment knobs (all optional): + DEVOURER_STREAM_SHAPE e.g. "0:+1,10:-1,20:+1" — per-subcarrier ±1 + pin spec; absent / "" = byte mode. + DEVOURER_STREAM_MTU payload bytes per frame (default 14, matches + a 24-byte byte-mode body). + DEVOURER_STREAM_BODY_BYTES floor for body size (default 24); shape mode + may grow the body past this when the pin + rank reduces per-symbol capacity. + DEVOURER_STREAM_SEED scrambler seed for the *encoder's* model + (default 0x5D). Must match the chip's actual + seed to make the shaped subcarriers appear + on-air at the requested ±1; for byte-only + use the default works. + DEVOURER_STREAM_OFFSET scrambler-phase offset for the body's first + bit (default 0; the README's PrecoderDemo + placement is 208 = SERVICE(16) + MAC(24·8)). + Byte mode ignores this. + DEVOURER_STREAM_ENTRY_STATE BCC entry state at the body (default 0). + Byte mode ignores this. + DEVOURER_STREAM_SEQ_START starting sequence number (default 0). +""" + +from __future__ import annotations + +import argparse +import os +import struct +import sys +from typing import Optional + +# Allow running both via `uv run python tools/precoder/stream_tx.py` +# and directly from inside tools/precoder. When invoked from the repo root, +# add this script's directory to sys.path so `import stream` resolves. +_HERE = os.path.dirname(os.path.abspath(__file__)) +if _HERE not in sys.path: + sys.path.insert(0, _HERE) + +import stream # noqa: E402 + + +def parse_shape_env(s: str) -> Optional[dict]: + """Parse "idx:±1,..." into {idx: ±1}. Empty / None → None (byte mode).""" + if not s: + return None + out: dict[int, int] = {} + for tok in s.split(","): + tok = tok.strip() + if not tok: + continue + if ":" not in tok: + raise ValueError(f"shape token {tok!r} missing ':' (want idx:±1)") + k, v = tok.split(":", 1) + k = int(k.strip(), 0) + v = v.strip() + # Accept +1 / -1 / + / - / 1 / -1. + if v in ("+1", "+", "1"): + sign = +1 + elif v in ("-1", "-"): + sign = -1 + else: + raise ValueError(f"shape value {v!r} (token {tok!r}); want ±1") + if k in out: + raise ValueError(f"duplicate subcarrier index {k} in shape") + out[k] = sign + return out or None + + +def _env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None or raw == "": + return default + return int(raw, 0) + + +def main(argv: Optional[list[str]] = None) -> int: + ap = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + ap.add_argument("--input", default=None, + help="read bytes from this file instead of stdin") + ap.add_argument("--shape", default=None, + help="override DEVOURER_STREAM_SHAPE") + ap.add_argument("--mtu", type=int, default=None, + help="override DEVOURER_STREAM_MTU") + ap.add_argument("--body-bytes", type=int, default=None, + help="override DEVOURER_STREAM_BODY_BYTES") + ap.add_argument("--seed", type=lambda s: int(s, 0), default=None, + help="override DEVOURER_STREAM_SEED") + ap.add_argument("--offset", type=int, default=None, + help="override DEVOURER_STREAM_OFFSET") + ap.add_argument("--entry-state", type=lambda s: int(s, 0), default=None, + help="override DEVOURER_STREAM_ENTRY_STATE") + ap.add_argument("--seq-start", type=int, default=None, + help="override DEVOURER_STREAM_SEQ_START") + ap.add_argument("--repeat", type=int, default=1, + help="emit each encoded body this many times in sequence " + "(RX dedups by seq). Combats early-frame loss during " + "the RX adapter's warmup period.") + ap.add_argument("--dump-bodies", default=None, + help="also write the concatenated bodies (without length " + "prefixes) to this path — for offline inspection") + args = ap.parse_args(argv) + + shape_raw = args.shape if args.shape is not None else os.environ.get( + "DEVOURER_STREAM_SHAPE", "") + shape = parse_shape_env(shape_raw) + mtu = args.mtu if args.mtu is not None else _env_int( + "DEVOURER_STREAM_MTU", stream.DEFAULT_BODY_BYTES - stream.ENVELOPE_LEN) + body_bytes = (args.body_bytes if args.body_bytes is not None + else _env_int("DEVOURER_STREAM_BODY_BYTES", + stream.DEFAULT_BODY_BYTES)) + seed = args.seed if args.seed is not None else _env_int( + "DEVOURER_STREAM_SEED", stream.DEFAULT_SEED) + offset = args.offset if args.offset is not None else _env_int( + "DEVOURER_STREAM_OFFSET", 0) + entry_state = args.entry_state if args.entry_state is not None else _env_int( + "DEVOURER_STREAM_ENTRY_STATE", 0) + seq_start = args.seq_start if args.seq_start is not None else _env_int( + "DEVOURER_STREAM_SEQ_START", 0) + + src = sys.stdin.buffer if args.input is None else open(args.input, "rb") + data = src.read() + if args.input is not None: + src.close() + + frames = stream.pack_stream(data, mtu=mtu, seq_start=seq_start) + if not frames: + sys.stderr.write("stream_tx: 0 bytes on input — no frames emitted\n") + return 0 + + out = sys.stdout.buffer + dump = open(args.dump_bodies, "wb") if args.dump_bodies else None + + repeat = max(1, args.repeat) + total_bytes = 0 + for f in frames: + body, layout = stream.encode_body( + f, shape=shape, body_bytes=body_bytes, + seed=seed, offset=offset, entry_state=entry_state, + ) + chunk = struct.pack(" 0x29B1. + assert stream.crc16_ccitt(b"123456789") == 0x29B1 + + +def test_crc16_ccitt_empty_is_init(): + assert stream.crc16_ccitt(b"") == 0xFFFF + + +def test_crc16_ccitt_detects_single_bit_flip(): + rng = np.random.default_rng(0) + data = bytes(rng.integers(0, 256, size=64, dtype=np.uint8).tolist()) + bad = bytearray(data) + bad[7] ^= 0x10 + assert stream.crc16_ccitt(data) != stream.crc16_ccitt(bytes(bad)) + + +# --------------------------------------------------------------------------- # +# Generator matrix +# --------------------------------------------------------------------------- # +def test_generator_matrix_shape(): + M = stream.compute_generator(PHY) + assert M.shape == (PHY.n_cbps, PHY.n_dbps) + assert M.dtype == np.uint8 + assert set(np.unique(M).tolist()) <= {0, 1} + + +def test_generator_matrix_is_bcc_jacobian(): + # M @ info ⊕ b(entry_state) == interleave(bcc_encode(info, entry_state)) + rng = np.random.default_rng(1) + M = stream.compute_generator(PHY) + for entry_state in (0, 7, 0x2A, 0x3F): + b = stream.compute_state_offset(entry_state, PHY) + for _ in range(5): + info = rng.integers(0, 2, size=PHY.n_dbps, dtype=np.uint8) + coded = enc.bcc_encode(info, init_state=entry_state) + sub_direct = enc.interleave(coded, PHY) + sub_via_M = ((M @ info) ^ b) & 1 + assert np.array_equal(sub_direct, sub_via_M) + + +def test_state_offset_state_zero_is_zero(): + # bcc_encode(0, state=0) -> 0; interleave(0) -> 0. + assert not stream.compute_state_offset(0, PHY).any() + + +# --------------------------------------------------------------------------- # +# GF(2) solver +# --------------------------------------------------------------------------- # +def test_gf2_solve_empty_constraints_returns_identity_basis(): + A = np.zeros((0, 5), dtype=np.uint8) + b = np.zeros(0, dtype=np.uint8) + sol = stream.gf2_solve(A, b) + assert sol.consistent + assert sol.n_free == 5 + assert np.array_equal(sol.null_basis, np.eye(5, dtype=np.uint8)) + assert np.array_equal(sol.free_cols, np.arange(5)) + + +def test_gf2_solve_particular_satisfies_constraints(): + rng = np.random.default_rng(2) + for _ in range(20): + m = int(rng.integers(1, 20)) + n = int(rng.integers(m, m + 8)) + A = rng.integers(0, 2, size=(m, n), dtype=np.uint8) + x = rng.integers(0, 2, size=n, dtype=np.uint8) + b = (A @ x) & 1 + sol = stream.gf2_solve(A, b) + assert sol.consistent + assert np.array_equal((A @ sol.particular) & 1, b) + + +def test_gf2_solve_null_basis_is_in_kernel(): + rng = np.random.default_rng(3) + A = rng.integers(0, 2, size=(10, 24), dtype=np.uint8) + b = rng.integers(0, 2, size=10, dtype=np.uint8) + sol = stream.gf2_solve(A, b) + assert sol.consistent + for j in range(sol.n_free): + assert not ((A @ sol.null_basis[:, j]) & 1).any() + + +def test_gf2_solve_free_cols_convention_round_trip(): + # data[j] = (particular ⊕ null @ data)[free_cols[j]] ⊕ particular[free_cols[j]] + rng = np.random.default_rng(4) + A = rng.integers(0, 2, size=(12, 24), dtype=np.uint8) + b = rng.integers(0, 2, size=12, dtype=np.uint8) + sol = stream.gf2_solve(A, b) + assert sol.consistent + data = rng.integers(0, 2, size=sol.n_free, dtype=np.uint8) + x = (sol.particular ^ ((sol.null_basis @ data) & 1)).astype(np.uint8) + delta = (x ^ sol.particular) & 1 + recovered = delta[sol.free_cols].astype(np.uint8) + assert np.array_equal(recovered, data) + + +def test_gf2_solve_detects_inconsistency(): + A = np.array([[1, 1], [1, 1]], dtype=np.uint8) + b = np.array([0, 1], dtype=np.uint8) + sol = stream.gf2_solve(A, b) + assert not sol.consistent + + +def test_gf2_rank_matches_dim(): + rng = np.random.default_rng(5) + A = rng.integers(0, 2, size=(20, 24), dtype=np.uint8) + r = stream.gf2_rank(A) + sol = stream.gf2_solve(A, np.zeros(A.shape[0], dtype=np.uint8)) + assert r == A.shape[1] - sol.n_free + assert 0 <= r <= min(A.shape) + + +# --------------------------------------------------------------------------- # +# Shape normalisation +# --------------------------------------------------------------------------- # +def test_normalise_shape_dict_and_array_agree(): + arr = np.zeros(PHY.n_sd, dtype=np.int8) + arr[3] = 1 + arr[17] = -1 + arr[40] = 1 + idx_a, vals_a = stream.normalise_shape(arr, PHY.n_sd) + idx_d, vals_d = stream.normalise_shape({3: 1, 17: -1, 40: 1}, PHY.n_sd) + assert np.array_equal(idx_a, idx_d) + assert np.array_equal(vals_a, vals_d) + + +def test_normalise_shape_rejects_bad_values(): + with pytest.raises(ValueError): + stream.normalise_shape({0: 2}, PHY.n_sd) + with pytest.raises(ValueError): + stream.normalise_shape({999: 1}, PHY.n_sd) + + +def test_normalise_shape_none_is_empty(): + idx, vals = stream.normalise_shape(None, PHY.n_sd) + assert idx.size == 0 and vals.size == 0 + + +# --------------------------------------------------------------------------- # +# Envelope round-trip +# --------------------------------------------------------------------------- # +def test_envelope_round_trip_default_body(): + f = stream.StreamFrame(seq=0x1234, total=7, payload=b"hello, world!!") + body = f.envelope_bytes(stream.DEFAULT_BODY_BYTES) + assert len(body) == stream.DEFAULT_BODY_BYTES + out = stream.parse_envelope(body) + assert out == f + + +def test_envelope_rejects_bad_magic(): + f = stream.StreamFrame(seq=1, total=1, payload=b"abc") + body = bytearray(f.envelope_bytes(24)) + body[0] ^= 0xFF + assert stream.parse_envelope(bytes(body)) is None + + +def test_envelope_rejects_bad_crc(): + f = stream.StreamFrame(seq=1, total=1, payload=b"abc") + body = bytearray(f.envelope_bytes(24)) + # CRC sits at HEADER_LEN + plen .. HEADER_LEN + plen + TRAILER_LEN; the + # trailing pad after it is ignored by the parser. Flip a CRC byte to + # isolate the CRC check from the (separately-tested) trailer behaviour. + body[stream.HEADER_LEN + len(f.payload)] ^= 0x01 + assert stream.parse_envelope(bytes(body)) is None + + +def test_envelope_ignores_trailing_chip_bytes(): + # Simulates the chip-side body dump: 4-byte FCS + assorted trailer past + # the envelope. The parser must still recover the frame from PLEN alone. + f = stream.StreamFrame(seq=0x4242, total=9, payload=b"hello stream!!") + body = f.envelope_bytes(24) + chip_view = body + b"\xde\xad\xbe\xef" + b"\x00" * 24 + out = stream.parse_envelope(chip_view) + assert out == f + + +def test_envelope_rejects_oversized_plen(): + f = stream.StreamFrame(seq=0, total=0, payload=b"x" * 4) + body = bytearray(f.envelope_bytes(24)) + # Bump PLEN past the body size, recompute CRC so we isolate the PLEN check. + body[6:8] = (200).to_bytes(2, "little") + crc = stream.crc16_ccitt(bytes(body[:-2])) + body[-2:] = crc.to_bytes(2, "little") + assert stream.parse_envelope(bytes(body)) is None + + +def test_envelope_rejects_oversized_payload(): + f = stream.StreamFrame(seq=0, total=0, payload=b"x" * 100) + with pytest.raises(ValueError): + f.envelope_bytes(24) + + +# --------------------------------------------------------------------------- # +# Byte-mode encode / decode +# --------------------------------------------------------------------------- # +def test_byte_mode_encode_decode_round_trip(): + f = stream.StreamFrame(seq=42, total=3, payload=b"streamtest!!aa") + body, layout = stream.encode_body(f) + assert layout.body_bytes == stream.DEFAULT_BODY_BYTES + assert layout.capacity_per_sym == PHY.n_dbps + assert layout.payload_capacity == stream.DEFAULT_BODY_BYTES - stream.ENVELOPE_LEN + out = stream.decode_body(body) + assert out == f + + +def test_byte_mode_respects_custom_body_size(): + f = stream.StreamFrame(seq=1, total=1, payload=b"") + body, layout = stream.encode_body(f, body_bytes=48) + assert layout.body_bytes == 48 + assert layout.n_sym == 16 # 48 bytes / 3 bytes per legacy OFDM symbol + assert stream.decode_body(body) == f + + +def test_byte_mode_rejects_misaligned_body_size(): + f = stream.StreamFrame(seq=0, total=0, payload=b"") + with pytest.raises(ValueError): + stream.encode_body(f, body_bytes=25) # not a multiple of 3 bytes/symbol + + +# --------------------------------------------------------------------------- # +# Shape-mode encode / decode + on-air subcarrier verification +# --------------------------------------------------------------------------- # +def _check_shape_honoured(body: bytes, shape: dict, seed: int, + offset: int, entry_state: int, n_sym: int): + """Feed `body` through emulate_chip and verify the pinned subcarriers + match the requested ±1 values for every body symbol.""" + psdu_bits = enc.bytes_to_bits(body)[: n_sym * PHY.n_dbps] + sub = enc.emulate_chip(psdu_bits, seed, PHY, n_sym, + offset=offset, entry_state=entry_state) + for sc, want in shape.items(): + assert np.all(sub[:, sc] == want), \ + f"subcarrier {sc}: want {want}, got {sub[:, sc].tolist()}" + + +def test_shape_mode_round_trip_simple_pin(): + f = stream.StreamFrame(seq=0xABCD, total=2, payload=b"abc") + shape = {0: +1, 10: -1, 20: +1} + body, layout = stream.encode_body(f, shape=shape) + assert layout.capacity_per_sym == PHY.n_dbps - len(shape) + assert layout.n_sym >= (stream.ENVELOPE_LEN + 3) * 8 // layout.capacity_per_sym + out = stream.decode_body(body, shape=shape) + assert out == f + _check_shape_honoured(body, shape, enc.DEFAULT_SEED, 0, 0, layout.n_sym) + + +def test_shape_mode_full_array_spec(): + arr = np.zeros(PHY.n_sd, dtype=np.int8) + arr[5] = +1 + arr[6] = -1 + arr[30] = +1 + f = stream.StreamFrame(seq=1, total=1, payload=b"x") + body, layout = stream.encode_body(f, shape=arr) + out = stream.decode_body(body, shape=arr) + assert out == f + shape_dict = {int(i): int(v) for i, v in zip(np.nonzero(arr)[0], arr[arr != 0])} + _check_shape_honoured(body, shape_dict, enc.DEFAULT_SEED, 0, 0, layout.n_sym) + + +def test_shape_mode_alternating_quarter_pattern(): + # Pin every fourth subcarrier on an alternating ±1 (12 constraints). + shape = {k: (+1 if (k // 4) % 2 == 0 else -1) for k in range(0, PHY.n_sd, 4)} + f = stream.StreamFrame(seq=9, total=10, payload=b"hi") + body, layout = stream.encode_body(f, shape=shape) + assert layout.capacity_per_sym == PHY.n_dbps - stream.gf2_rank( + stream.compute_generator(PHY)[np.array(sorted(shape.keys())), :] + ) + out = stream.decode_body(body, shape=shape) + assert out == f + _check_shape_honoured(body, shape, enc.DEFAULT_SEED, 0, 0, layout.n_sym) + + +def test_shape_mode_mid_frame_offset_and_entry_state(): + # The README's body offset for PrecoderDemo: SERVICE(16) + 24B header. + rng = np.random.default_rng(6) + prefix = rng.integers(0, 2, size=16 + 24 * 8, dtype=np.uint8) + entry_state = enc.bcc_final_state(prefix) + offset = len(prefix) + + shape = {2: -1, 14: +1, 33: -1} + f = stream.StreamFrame(seq=7, total=0, payload=b"mid") + body, layout = stream.encode_body( + f, shape=shape, offset=offset, entry_state=entry_state + ) + out = stream.decode_body( + body, shape=shape, offset=offset, entry_state=entry_state + ) + assert out == f + _check_shape_honoured(body, shape, enc.DEFAULT_SEED, offset, entry_state, + layout.n_sym) + + +def test_shape_mode_rejects_inconsistent_pins(): + # Pin two subcarriers that share the same coded-bit row of M to opposite + # values — the constraint becomes 0 = 1 mod 2 at that row. + M = stream.compute_generator(PHY) + # find two rows of M that are identical (extremely common for BCC+interleaver). + pairs = [] + for i in range(M.shape[0]): + for j in range(i + 1, M.shape[0]): + if np.array_equal(M[i, :], M[j, :]): + # `b(entry_state)` evaluated at state=0 is zero, so for a pair + # of subcarriers with the same M-row the chip emits + # sub[i] == sub[j] when scrambler XOR is also equal at those + # subcarrier positions. We force inconsistency at the M[S,:] + # level by requesting opposite ±1: rhs becomes 0 vs 1. + pairs.append((i, j)) + break + if pairs: + break + if not pairs: + pytest.skip("M has no duplicate rows — no easy inconsistency to force") + i, j = pairs[0] + shape = {i: +1, j: -1} + f = stream.StreamFrame(seq=0, total=0, payload=b"") + with pytest.raises(ValueError): + stream.encode_body(f, shape=shape) + + +def test_shape_mode_full_rank_pin_is_rejected(): + # Pin enough subcarriers to drive rank to 24 — no room for framing bits. + M = stream.compute_generator(PHY) + # Greedy independent-row pick to reach rank 24. + chosen: list[int] = [] + A = np.zeros((0, PHY.n_dbps), dtype=np.uint8) + for r in range(M.shape[0]): + cand = np.vstack([A, M[r, :]]) + if stream.gf2_rank(cand) > stream.gf2_rank(A): + chosen.append(r) + A = cand + if stream.gf2_rank(A) == PHY.n_dbps: + break + assert stream.gf2_rank(A) == PHY.n_dbps + shape = {sc: +1 for sc in chosen} + f = stream.StreamFrame(seq=0, total=0, payload=b"") + with pytest.raises(ValueError, match="rank=24"): + stream.encode_body(f, shape=shape) + + +# --------------------------------------------------------------------------- # +# Stream helpers +# --------------------------------------------------------------------------- # +def test_pack_stream_chunks_at_mtu(): + data = bytes(range(50)) + frames = stream.pack_stream(data, mtu=14) + assert len(frames) == 4 # 14 + 14 + 14 + 8 + assert frames[0].seq == 0 and frames[-1].seq == 3 + assert all(f.total == 4 for f in frames) + assert b"".join(f.payload for f in frames) == data + + +def test_pack_stream_seq_start_and_wraparound(): + data = bytes(range(30)) + frames = stream.pack_stream(data, mtu=14, seq_start=0xFFFE) + assert [f.seq for f in frames] == [0xFFFE, 0xFFFF, 0x0000] + + +def test_pack_stream_empty(): + assert stream.pack_stream(b"", mtu=14) == [] + + +def test_unpack_stream_drops_duplicates_and_stops_at_total(): + frames = [ + stream.StreamFrame(seq=0, total=3, payload=b"AAA"), + stream.StreamFrame(seq=2, total=3, payload=b"CCC"), + stream.StreamFrame(seq=0, total=3, payload=b"AAA"), # dup + stream.StreamFrame(seq=1, total=3, payload=b"BBB"), + stream.StreamFrame(seq=4, total=3, payload=b"ignored"), # past total + ] + out = list(stream.unpack_stream(frames)) + assert out == [(0, b"AAA"), (2, b"CCC"), (1, b"BBB")] + + +def test_pack_unpack_round_trip_reorders_by_seq_at_caller(): + data = bytes(range(64)) + frames = stream.pack_stream(data, mtu=14) + received = sorted(stream.unpack_stream(frames), key=lambda kv: kv[0]) + assert b"".join(payload for _, payload in received) == data + + +# --------------------------------------------------------------------------- # +# End-to-end: stream over byte-mode bodies +# --------------------------------------------------------------------------- # +def test_byte_mode_end_to_end_byte_stream(): + rng = np.random.default_rng(7) + data = bytes(rng.integers(0, 256, size=200, dtype=np.uint8).tolist()) + frames = stream.pack_stream(data) + bodies = [stream.encode_body(f)[0] for f in frames] + decoded = [stream.decode_body(b) for b in bodies] + assert all(d is not None for d in decoded) + out = sorted(((d.seq, d.payload) for d in decoded), key=lambda kv: kv[0]) + assert b"".join(p for _, p in out) == data + + +def test_shape_mode_end_to_end_byte_stream(): + rng = np.random.default_rng(8) + data = bytes(rng.integers(0, 256, size=120, dtype=np.uint8).tolist()) + shape = {0: +1, 8: -1, 16: +1, 24: -1, 32: +1} + frames = stream.pack_stream(data) + bodies_and_layouts = [stream.encode_body(f, shape=shape) for f in frames] + decoded = [stream.decode_body(b, shape=shape) for b, _ in bodies_and_layouts] + assert all(d is not None for d in decoded) + out = sorted(((d.seq, d.payload) for d in decoded), key=lambda kv: kv[0]) + assert b"".join(p for _, p in out) == data + # And every body honours the shape on-air. + for (body, layout), f in zip(bodies_and_layouts, frames): + _check_shape_honoured(body, shape, enc.DEFAULT_SEED, 0, 0, layout.n_sym) diff --git a/txdemo/stream_tx_demo/main.cpp b/txdemo/stream_tx_demo/main.cpp new file mode 100644 index 0000000..026c4ba --- /dev/null +++ b/txdemo/stream_tx_demo/main.cpp @@ -0,0 +1,264 @@ +// StreamTxDemo — stdin-driven TX for the precoder stream link. +// +// Mirrors PrecoderDemo's chip-setup boilerplate (legacy 6M OFDM probe-request +// carrier, single-stream BPSK/BCC, RTL8812AU/8821AU/8811AU), but instead of +// looping on one shaped PSDU it reads a sequence of length-prefixed PSDU +// bodies from stdin and sends one probe-request per body. The encoder +// (tools/precoder/stream_tx.py) drives this binary; the two are intentionally +// split so the C++ side stays USB-only and the framing math stays in Python. +// +// On-wire frame protocol (stdin): +// +// EOF on stdin = orderly shutdown. +// +// Why "descrambled" body bytes: the Realtek chip applies its own scrambler +// before BCC. So the bytes we hand to `send_packet` are the bits the chip +// will scramble — i.e. the bits the encoder produced as `descramble(...)`'s +// pre-image. Symmetric on RX: DEVOURER_DUMP_BODY / DEVOURER_STREAM_OUT print +// what the chip has already descrambled. The byte stream is the same on both +// ends. +// +// Usage: +// DEVOURER_PID=0x8812 DEVOURER_CHANNEL=6 ./build/StreamTxDemo \\ +// [--interval-ms MS] [--max-psdu BYTES] < bodies.bin +// uv run python tools/precoder/stream_tx.py < data.bin | \\ +// ./build/StreamTxDemo +// +// Env: same conventions as the other demos (DEVOURER_VID / DEVOURER_PID / +// DEVOURER_CHANNEL / DEVOURER_SKIP_RESET). + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(_MSC_VER) + #include + #include + #include + #include + typedef int pid_t; + #define sleep(seconds) Sleep((seconds)*1000) +#elif defined(__ANDROID__) + #include + #include +#elif defined(__APPLE__) + #include + #include +#else + #include + #include +#endif + +#include "FrameParser.h" +#include "RtlUsbAdapter.h" +#include "WiFiDriver.h" +#include "logger.h" + +#define USB_VENDOR_ID 0x0bda + +static constexpr uint16_t kRealtekProductIds[] = { + 0x8812, 0x0811, 0xa811, 0xb811, 0x8813, +}; + +// Identical legacy-6M radiotap + 802.11 probe-request header to PrecoderDemo. +// Same canonical SA, same matcher in demo/main.cpp's RX path. Keep these +// three in lockstep — see CLAUDE.md. +static const uint8_t kRadiotapLegacy6M[13] = { + 0x00, 0x00, 0x0d, 0x00, 0x04, 0x80, 0x00, + 0x00, 0x0c, 0x00, 0x08, 0x00, 0x00}; +static const uint8_t kCanonicalSa[6] = {0x57, 0x42, 0x75, 0x05, 0xd6, 0x00}; + +static std::vector build_dot11_probe_req() { + std::vector h = { + 0x40, 0x00, 0x00, 0x00, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }; + h.insert(h.end(), kCanonicalSa, kCanonicalSa + 6); + h.insert(h.end(), kCanonicalSa, kCanonicalSa + 6); + h.push_back(0x80); + h.push_back(0x00); + return h; +} + +// Read exactly `n` bytes from FILE *f into buf. Returns: +// true -> got all n bytes +// false -> clean EOF before any bytes (orderly stdin close) +// Terminates on short read mid-record (a stream framing bug we'd rather see). +static bool read_exact(FILE *f, void *buf, size_t n) { + size_t got = 0; + auto *p = static_cast(buf); + while (got < n) { + size_t r = std::fread(p + got, 1, n - got, f); + if (r == 0) { + if (got == 0 && std::feof(f)) return false; + std::fprintf(stderr, + "stream_tx_demo: short read on stdin (%zu/%zu); record " + "truncated mid-PSDU\n", got, n); + std::exit(2); + } + got += r; + } + return true; +} + +int main(int argc, char **argv) { + auto logger = std::make_shared(); + + int interval_ms = 2; + // Sanity cap on a single PSDU body; protects against an upstream framing + // bug that would otherwise have us allocate gigabytes from a stray length + // prefix. 4096 covers any realistic legacy-6M probe-request payload. + size_t max_psdu = 4096; + long termux_fd = 0; + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a == "--interval-ms" && i + 1 < argc) { + interval_ms = std::atoi(argv[++i]); + } else if (a == "--max-psdu" && i + 1 < argc) { + max_psdu = static_cast(std::strtoul(argv[++i], nullptr, 0)); + } else { + char *end = nullptr; + long v = std::strtol(a.c_str(), &end, 0); + if (end && *end == '\0' && v > 0) termux_fd = v; + } + } + +#if defined(_MSC_VER) + // Make stdin binary so a 0x1A or CRLF doesn't corrupt PSDU bytes. + _setmode(_fileno(stdin), _O_BINARY); +#endif + + libusb_context *context = nullptr; + libusb_device_handle *handle = nullptr; + int rc; + + if (termux_fd > 0) { + logger->info("Termux mode: wrapping fd {}", termux_fd); + libusb_set_option(NULL, LIBUSB_OPTION_NO_DEVICE_DISCOVERY); + libusb_set_option(NULL, LIBUSB_OPTION_WEAK_AUTHORITY); + libusb_init(&context); + rc = libusb_wrap_sys_device(context, (intptr_t)termux_fd, &handle); + if (rc < 0) { + logger->error("libusb_wrap_sys_device: {}", rc); + return 1; + } + } else { + rc = libusb_init(&context); + if (rc < 0) return rc; + + uint16_t target_pid = 0; + if (const char *pid_env = std::getenv("DEVOURER_PID")) { + target_pid = static_cast(std::strtoul(pid_env, nullptr, 0)); + logger->info("DEVOURER_PID={:04x} (limiting to this PID)", target_pid); + } + uint16_t target_vid = USB_VENDOR_ID; + if (const char *vid_env = std::getenv("DEVOURER_VID")) { + target_vid = static_cast(std::strtoul(vid_env, nullptr, 0)); + } + for (uint16_t pid : kRealtekProductIds) { + if (target_pid != 0 && pid != target_pid) continue; + handle = libusb_open_device_with_vid_pid(context, target_vid, pid); + if (handle != NULL) { + logger->info("Opened device {:04x}:{:04x}", target_vid, pid); + break; + } + } + if (handle == NULL && target_pid != 0) { + handle = libusb_open_device_with_vid_pid(context, target_vid, target_pid); + } + if (handle == NULL) { + logger->error("No supported device found under VID {:04x}", target_vid); + libusb_exit(context); + return 1; + } + } + + if (libusb_kernel_driver_active(handle, 0)) { + rc = libusb_detach_kernel_driver(handle, 0); + if (rc != 0) logger->error("libusb_detach_kernel_driver: {}", rc); + } + if (termux_fd == 0 && !std::getenv("DEVOURER_SKIP_RESET")) { + libusb_reset_device(handle); + } + rc = libusb_claim_interface(handle, 0); + assert(rc == 0); + + WiFiDriver wifi_driver{logger}; + auto rtlDevice = wifi_driver.CreateRtlDevice(handle); + + int channel = 6; + if (const char *ch_env = std::getenv("DEVOURER_CHANNEL")) { + channel = std::atoi(ch_env); + logger->info("DEVOURER_CHANNEL set — tuning TX to channel {}", channel); + } + + rtlDevice->SetTxPower(40); + rtlDevice->InitWrite(SelectedChannel{.Channel = static_cast(channel), + .ChannelOffset = 0, + .ChannelWidth = CHANNEL_WIDTH_20}); + sleep(2); + + auto dot11 = build_dot11_probe_req(); + std::vector tx_buf; + tx_buf.reserve(sizeof(kRadiotapLegacy6M) + dot11.size() + max_psdu); + + logger->info( + "stream TX ready (legacy 6M OFDM, ch {}); reading length-prefixed PSDUs " + "from stdin", channel); + + long tx_count = 0; + while (true) { + uint8_t len_bytes[4]; + if (!read_exact(stdin, len_bytes, sizeof(len_bytes))) break; // clean EOF + uint32_t len = static_cast(len_bytes[0]) + | (static_cast(len_bytes[1]) << 8) + | (static_cast(len_bytes[2]) << 16) + | (static_cast(len_bytes[3]) << 24); + if (len == 0 || len > max_psdu) { + std::fprintf(stderr, + "stream_tx_demo: PSDU length %u out of range (max %zu); " + "stopping\n", len, max_psdu); + break; + } + std::vector psdu(len); + if (!read_exact(stdin, psdu.data(), len)) { + std::fprintf(stderr, + "stream_tx_demo: EOF mid-PSDU (expected %u bytes)\n", len); + break; + } + + tx_buf.clear(); + tx_buf.insert(tx_buf.end(), kRadiotapLegacy6M, + kRadiotapLegacy6M + sizeof(kRadiotapLegacy6M)); + tx_buf.insert(tx_buf.end(), dot11.begin(), dot11.end()); + tx_buf.insert(tx_buf.end(), psdu.begin(), psdu.end()); + bool ok = rtlDevice->send_packet(tx_buf.data(), tx_buf.size()); + ++tx_count; + // Tag matches PrecoderDemo's so existing log-watchers keep working; route + // to stderr to leave stdout clean for downstream callers that may chain + // this binary. + if (tx_count <= 5 || tx_count % 500 == 0) { + std::fprintf(stderr, + "TX #%ld ok=%d psdu=%u total=%zu\n", + tx_count, ok ? 1 : 0, len, tx_buf.size()); + std::fflush(stderr); + } + if (interval_ms > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms)); + } + } + + std::fprintf(stderr, "done; sent %ld PSDUs\n", tx_count); + libusb_release_interface(handle, 0); + libusb_close(handle); + libusb_exit(context); + return 0; +}