diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9d906b2..9f7882d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,7 +24,7 @@ jobs: cache: pip - name: Install dependencies - run: pip install -r requirements.txt ruff mypy pytest + run: pip install -r requirements-dev.txt - name: Lint run: ruff check clipsync/ tests/ diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 49baac7..8b28674 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -51,7 +51,7 @@ jobs: - name: Install Python deps run: | python -m pip install --upgrade pip - pip install -r requirements.txt + pip install -r requirements-dev.txt - name: Build run: python -m clipsync.build diff --git a/clipsync/clipboard.py b/clipsync/clipboard.py index a065807..0792a8c 100644 --- a/clipsync/clipboard.py +++ b/clipsync/clipboard.py @@ -23,6 +23,7 @@ from __future__ import annotations +import importlib.util import io import logging import os @@ -36,6 +37,7 @@ import pyperclip from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer +from watchdog.observers.api import BaseObserver from . import config from .crypto import decrypt, encrypt, is_encrypted @@ -53,7 +55,7 @@ _STOP_SENTINEL = object() -def _try_start_xfixes_watcher() -> "queue.SimpleQueue[object] | None": +def _try_start_xfixes_watcher() -> queue.SimpleQueue[object] | None: """Start an X11 XFixes clipboard-owner watcher. Returns a SimpleQueue that receives a True value each time the CLIPBOARD @@ -69,8 +71,8 @@ def _try_start_xfixes_watcher() -> "queue.SimpleQueue[object] | None": user has actually copied something. """ try: - from Xlib import display # type: ignore[import] - from Xlib.protocol import rq # type: ignore[import] + from Xlib import display + from Xlib.protocol import rq except ImportError: return None @@ -89,29 +91,45 @@ def _try_start_xfixes_watcher() -> "queue.SimpleQueue[object] | None": # Inline minimal XFixes protocol definitions -- python-xlib 0.15 doesn't # ship an xfixes module, so we define only what we need here. - class _QueryVersion(rq.ReplyRequest): # type: ignore[misc] + class _QueryVersion(rq.ReplyRequest): _request = rq.Struct( - rq.Card8("opcode"), rq.Opcode(0), rq.RequestLength(), - rq.Card32("client_major"), rq.Card32("client_minor"), + rq.Card8("opcode"), + rq.Opcode(0), + rq.RequestLength(), + rq.Card32("client_major"), + rq.Card32("client_minor"), ) _reply = rq.Struct( - rq.ReplyCode(), rq.Pad(1), rq.Card16("sequence_number"), - rq.Card32("length"), rq.Card32("major_version"), - rq.Card32("minor_version"), rq.Pad(16), + rq.ReplyCode(), + rq.Pad(1), + rq.Card16("sequence_number"), + rq.Card32("length"), + rq.Card32("major_version"), + rq.Card32("minor_version"), + rq.Pad(16), ) - class _SelectSelectionInput(rq.Request): # type: ignore[misc] + class _SelectSelectionInput(rq.Request): _request = rq.Struct( - rq.Card8("opcode"), rq.Opcode(2), rq.RequestLength(), - rq.Window("window"), rq.Card32("selection"), rq.Card32("event_mask"), + rq.Card8("opcode"), + rq.Opcode(2), + rq.RequestLength(), + rq.Window("window"), + rq.Card32("selection"), + rq.Card32("event_mask"), ) - class _SelectionNotify(rq.Event): # type: ignore[misc] + class _SelectionNotify(rq.Event): _code = _first_event _fields = rq.Struct( - rq.Card8("type"), rq.Card8("subtype"), rq.Card16("sequence_number"), - rq.Window("window"), rq.Card32("selection"), rq.Card32("owner"), - rq.Card32("selection_timestamp"), rq.Card32("timestamp"), + rq.Card8("type"), + rq.Card8("subtype"), + rq.Card16("sequence_number"), + rq.Window("window"), + rq.Card32("selection"), + rq.Card32("owner"), + rq.Card32("selection_timestamp"), + rq.Card32("timestamp"), ) notify_q: queue.SimpleQueue[object] = queue.SimpleQueue() @@ -122,15 +140,19 @@ def _watch() -> None: d.display.extension_major_opcodes["XFIXES"] = _opcode d.display.add_extension_event(_first_event, _SelectionNotify) _QueryVersion( - display=d.display, opcode=_opcode, - client_major=5, client_minor=0, + display=d.display, + opcode=_opcode, + client_major=5, + client_minor=0, ) d.sync() root = d.screen().root clipboard_atom = d.intern_atom("CLIPBOARD") _SelectSelectionInput( - display=d.display, opcode=_opcode, - window=root, selection=clipboard_atom, + display=d.display, + opcode=_opcode, + window=root, + selection=clipboard_atom, event_mask=1, # SelectionSetOwnerMask ) d.flush() @@ -147,7 +169,7 @@ def _watch() -> None: return notify_q -def _try_start_xlib_clipboard_owner() -> "_XlibClipboardOwner | None": +def _try_start_xlib_clipboard_owner() -> _XlibClipboardOwner | None: """Try to create an in-process X11 clipboard owner using python-xlib. Returns None on Wayland, missing python-xlib, or any startup error. @@ -157,9 +179,7 @@ def _try_start_xlib_clipboard_owner() -> "_XlibClipboardOwner | None": - Has no ownership-transition gap (we own the selection immediately) - Responds to SelectionRequests in microseconds (single round trip) """ - try: - from Xlib import display # type: ignore[import] - except ImportError: + if importlib.util.find_spec("Xlib") is None: return None try: return _XlibClipboardOwner() @@ -177,7 +197,7 @@ class _XlibClipboardOwner: """ def __init__(self) -> None: - from Xlib import X, Xatom, display # type: ignore[import] + from Xlib import X, Xatom, display self._X = X self._d = display.Display() @@ -187,7 +207,7 @@ def __init__(self) -> None: self._UTF8 = self._d.intern_atom("UTF8_STRING") self._COMPOUND_TEXT = self._d.intern_atom("COMPOUND_TEXT") self._TARGETS = self._d.intern_atom("TARGETS") - self._XA_ATOM = Xatom.ATOM # type for lists of atoms (= 4) + self._XA_ATOM = Xatom.ATOM # type for lists of atoms (= 4) self._XA_STRING = Xatom.STRING # plain ASCII/Latin-1 string type (= 31) self._content: str | None = None @@ -196,9 +216,7 @@ def __init__(self) -> None: # Self-pipe: writing a byte wakes the event loop. self._pipe_r, self._pipe_w = os.pipe() - self._thread = threading.Thread( - target=self._event_loop, name="clipsync-xlib-owner", daemon=True - ) + self._thread = threading.Thread(target=self._event_loop, name="clipsync-xlib-owner", daemon=True) self._thread.start() def set(self, text: str) -> None: @@ -267,7 +285,7 @@ def _handle_event(self, event) -> None: log.debug("xlib clipboard: lost CLIPBOARD ownership (SelectionClear)") def _serve_request(self, req) -> None: - from Xlib.protocol.event import SelectionNotify # type: ignore[import] + from Xlib.protocol.event import SelectionNotify X = self._X with self._content_lock: @@ -424,14 +442,14 @@ def __init__(self, settings: config.Settings) -> None: self._poll_thread: threading.Thread | None = None self._in_thread: threading.Thread | None = None self._in_queue: queue.SimpleQueue[str] = queue.SimpleQueue() - self._observer: Observer | None = None + self._observer: BaseObserver | None = None self._last_synced: str | bytes | None = None self._lock = threading.Lock() self._last_read_error: str | None = None self._last_write_error: str | None = None self._last_decrypt_error: str | None = None - self._xfixes_queue: "queue.SimpleQueue[object] | None" = None - self._clipboard_owner: "_XlibClipboardOwner | None" = None + self._xfixes_queue: queue.SimpleQueue[object] | None = None + self._clipboard_owner: _XlibClipboardOwner | None = None self._history = ClipboardHistory(settings) @property @@ -452,8 +470,10 @@ def start(self) -> None: _no_xlib = os.environ.get("CLIPSYNC_NO_XLIB") self._xfixes_queue = _try_start_xfixes_watcher() if not _no_xfixes else None if self._xfixes_queue is None: - log.debug("XFixes unavailable%s; falling back to clipboard polling", - " (CLIPSYNC_NO_XFIXES set)" if _no_xfixes else "") + log.debug( + "XFixes unavailable%s; falling back to clipboard polling", + " (CLIPSYNC_NO_XFIXES set)" if _no_xfixes else "", + ) if not _no_xlib: self._clipboard_owner = _try_start_xlib_clipboard_owner() if self._clipboard_owner is None: @@ -491,6 +511,14 @@ def stop(self) -> None: self._poll_thread.join(timeout=3) log.info("Clipboard sync stopped") + def clear_history(self) -> None: + """Drop all stored clipboard history entries (called from UI events). + + Exposed as a public method so callers don't need to reach into + the private _history attribute. + """ + self._history.clear() + def _passphrase(self) -> str: val = self._settings.get("encryption_passphrase") or "" return val if isinstance(val, str) else "" diff --git a/clipsync/config.py b/clipsync/config.py index d4f1745..8c8ea83 100644 --- a/clipsync/config.py +++ b/clipsync/config.py @@ -114,14 +114,35 @@ def _load(self) -> None: with self._path.open("r", encoding="utf-8") as fh: loaded = json.load(fh) except (OSError, json.JSONDecodeError) as exc: + # Do NOT overwrite a corrupted/unreadable file with defaults. + # The user may still be able to recover it manually; clobbering + # it here turns a transient parse error into permanent data + # loss. Stay on defaults in memory and let the next successful + # set() re-persist. logging.warning("Failed to read settings, using defaults: %s", exc) - loaded = {} + return + if not isinstance(loaded, dict): + logging.warning("Settings file did not contain a JSON object; using defaults") + return merged = dict(DEFAULT_SETTINGS) merged.update({k: v for k, v in loaded.items() if k in DEFAULT_SETTINGS}) if not merged.get("api_key"): merged["api_key"] = uuid.uuid4().hex self._data = merged - self._persist_locked() + # Only persist if the on-disk file is incomplete (missing a default + # key) or has an empty api_key that we just generated. Otherwise + # leave the file alone: rewriting it on every startup is needless + # churn and could race with a concurrent writer (e.g. a UI + # subprocess that just wrote a new value). + loaded_keys = set(loaded.keys()) + needs_persist = not loaded.get("api_key") or any(k not in loaded_keys for k in DEFAULT_SETTINGS) + if needs_persist: + self._persist_locked() + else: + try: + self._mtime_ns = self._path.stat().st_mtime_ns + except OSError: + pass def _persist_locked(self) -> None: self._path.parent.mkdir(parents=True, exist_ok=True) @@ -178,6 +199,9 @@ def reload(self) -> None: except (OSError, json.JSONDecodeError) as exc: logging.warning("Failed to reload settings: %s", exc) return + if not isinstance(loaded, dict): + logging.warning("Settings file did not contain a JSON object; keeping in-memory state") + return merged = dict(DEFAULT_SETTINGS) merged.update({k: v for k, v in loaded.items() if k in DEFAULT_SETTINGS}) self._data = merged @@ -213,9 +237,8 @@ def configure_logging() -> None: datefmt="%Y-%m-%d %H:%M:%S", ) from logging.handlers import RotatingFileHandler - file_handler = RotatingFileHandler( - LOG_FILE, encoding="utf-8", maxBytes=10 * 1024 * 1024, backupCount=3 - ) + + file_handler = RotatingFileHandler(LOG_FILE, encoding="utf-8", maxBytes=10 * 1024 * 1024, backupCount=3) file_handler.setFormatter(fmt) stream_handler = logging.StreamHandler() stream_handler.setFormatter(fmt) diff --git a/clipsync/file_transfer.py b/clipsync/file_transfer.py index bc4c164..87fc4e7 100644 --- a/clipsync/file_transfer.py +++ b/clipsync/file_transfer.py @@ -13,6 +13,7 @@ from __future__ import annotations import logging +import os import shutil import time from collections.abc import Callable @@ -20,6 +21,7 @@ from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer +from watchdog.observers.api import BaseObserver from . import config from .debug import _safe_hostname @@ -38,7 +40,7 @@ def __init__( ) -> None: self._settings = settings self._on_received = on_received - self._observer: Observer | None = None + self._observer: BaseObserver | None = None @property def files_dir(self) -> Path: @@ -105,7 +107,7 @@ def _handle(self, path: Path) -> None: def on_created(self, event: FileSystemEvent) -> None: if not event.is_directory: - self._handle(Path(event.src_path)) + self._handle(Path(os.fsdecode(event.src_path))) def on_moved(self, event: FileSystemEvent) -> None: # Syncthing uses atomic rename: .syncthing.*.tmp → final name. diff --git a/clipsync/history.py b/clipsync/history.py index baff233..0eb7b5e 100644 --- a/clipsync/history.py +++ b/clipsync/history.py @@ -15,14 +15,11 @@ import threading import time from dataclasses import dataclass -from typing import TYPE_CHECKING, Any +from typing import Any from . import config from .crypto import decrypt, encrypt, is_encrypted -if TYPE_CHECKING: - pass - log = logging.getLogger(__name__) diff --git a/clipsync/main.py b/clipsync/main.py index 1a41422..04570c7 100644 --- a/clipsync/main.py +++ b/clipsync/main.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging +import os import platform import shutil import signal @@ -29,8 +30,8 @@ from . import config, update from .clipboard import ClipboardSync -from .file_transfer import FileTransfer from .debug import LogMirror +from .file_transfer import FileTransfer from .pairing import PendingDeviceWatcher, accept_pending_device from .single_instance import AlreadyRunning, SingleInstance from .syncthing import SyncthingError, SyncthingService @@ -374,7 +375,7 @@ def _handle_ui_event(self, evt: dict) -> None: self._on_folder_changed(path) elif kind == "clear_history": if self.clipboard is not None: - self.clipboard._history.clear() + self.clipboard.clear_history() log.info("Clipboard history cleared from UI") elif kind == "reset": log.info("Devices reset from UI") @@ -431,15 +432,39 @@ def _send_file_worker(self, source: Path) -> None: def _on_file_received(self, path: Path, sender: str) -> None: downloads = Path.home() / "Downloads" downloads.mkdir(parents=True, exist_ok=True) + stem = path.stem + suffix = path.suffix + # Atomically claim the destination filename with O_EXCL to close + # the TOCTOU window between two concurrent receives of same-named + # files from different senders: previously the exists() check + + # copy2 could race and clobber each other. dest = downloads / path.name - if dest.exists(): - stem = path.stem - suffix = path.suffix - i = 1 - while dest.exists(): - dest = downloads / f"{stem}_{i}{suffix}" - i += 1 - shutil.copy2(path, dest) + fd = -1 + attempt = 0 + while attempt < 1000: + try: + fd = os.open(str(dest), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) + break + except FileExistsError: + attempt += 1 + dest = downloads / f"{stem}_{attempt}{suffix}" + if fd < 0: + log.warning("Could not find free filename for received file %s", path.name) + return + try: + with os.fdopen(fd, "wb") as out, path.open("rb") as src: + shutil.copyfileobj(src, out) + try: + shutil.copystat(path, dest) + except OSError: + pass + except OSError: + log.exception("Failed to save received file %s", path) + try: + dest.unlink(missing_ok=True) + except OSError: + pass + return log.info("Saved received file to %s", dest) self._notify(f"File from {sender}", f"Saved to ~/Downloads/{dest.name}") diff --git a/clipsync/pairing.py b/clipsync/pairing.py index 19d2ece..05f4265 100644 --- a/clipsync/pairing.py +++ b/clipsync/pairing.py @@ -27,15 +27,40 @@ _DEVICE_ID_RE = re.compile(r"^[A-Z2-7]{7}(-[A-Z2-7]{7}){7}$") +def _validate_luhn_checksums(device_id: str) -> bool: + """Verify the Luhn mod-32 check character in each of the 4 14-char blocks. + + Syncthing device IDs are 56 base32 chars split as 4 x (13 data + 1 + Luhn check). The hyphens split into 8 groups of 7, so they are not + aligned with the Luhn block boundaries; we strip them first. + """ + from .syncthing import _luhn32 + + raw = device_id.replace("-", "") + if len(raw) != 56: + return False + for i in range(0, 56, 14): + block = raw[i : i + 14] + if _luhn32(block[:13]) != block[13]: + return False + return True + + def normalize_device_id(raw: str) -> str | None: - """Strip whitespace, uppercase, and validate the device ID shape.""" + """Strip whitespace, uppercase, validate the device ID shape and checksum.""" if not raw: return None candidate = raw.strip().upper() candidate = re.sub(r"\s+", "", candidate) - if _DEVICE_ID_RE.match(candidate): - return candidate - return None + if not _DEVICE_ID_RE.match(candidate): + return None + # Shape is valid; also check the Luhn mod-32 checksums so a typo'd + # (but well-formed) ID is rejected here with a clear "invalid" result + # instead of being sent to Syncthing, which would reject it later + # with a vaguer error. + if not _validate_luhn_checksums(candidate): + return None + return candidate def generate_qr(device_id: str, box_size: int = 8, border: int = 2) -> Image.Image: diff --git a/clipsync/syncthing.py b/clipsync/syncthing.py index 81da5e1..8ae1e7b 100644 --- a/clipsync/syncthing.py +++ b/clipsync/syncthing.py @@ -79,37 +79,75 @@ def _release_asset_url(version: str) -> str: return f"https://github.com/syncthing/syncthing/releases/download/{v}/{stem}.{ext}" -# SHA-256 hashes of known-good Syncthing binaries for supply-chain verification. -# Expand this dictionary as new platforms/versions are validated. -_KNOWN_BINARY_HASHES: dict[tuple[str, str, str], str] = { - ("linux", "amd64", "v2.0.16"): ("ef9fd7380fc3a4a000e2cc213e56697a091d7b5cd6e540026b14566bc85e3a4b"), -} +def _fetch_official_sha256sums(version: str) -> dict[str, str]: + """Fetch Syncthing's published sha256sum.txt.asc for *version*. + + Returns a dict mapping archive filename (e.g. + ``syncthing-linux-amd64-v2.0.16.tar.gz``) to its lowercase hex + SHA-256. Returns an empty dict on any failure (network error, + unexpected format). The file is a PGP-signed message; we parse + only the `` `` lines and skip the armor header. + """ + v = version if version.startswith("v") else f"v{version}" + url = f"https://github.com/syncthing/syncthing/releases/download/{v}/sha256sum.txt.asc" + try: + data = _download(url) + except URLError as exc: + log.warning("Failed to fetch Syncthing sha256sum.txt.asc: %s", exc) + return {} + out: dict[str, str] = {} + for raw_line in data.decode("ascii", errors="replace").splitlines(): + line = raw_line.strip() + if not line or line.startswith("-----") or line.startswith("Hash:"): + continue + parts = line.split(None, 1) + if len(parts) != 2: + continue + digest, name = parts + name = name.strip().lstrip("*") + digest = digest.lower() + if len(digest) == 64 and all(c in "0123456789abcdef" for c in digest): + out[name] = digest + return out -def _verify_binary_hash(binary: Path, version: str) -> None: - """Raise SyncthingError if the binary hash doesn't match the known value.""" +def _archive_filename(version: str) -> str: + """Return the release asset filename for this platform and version.""" + os_name, arch, ext = _platform_archive_info() + v = version if version.startswith("v") else f"v{version}" + return f"syncthing-{os_name}-{arch}-{v}.{ext}" + + +def _verify_archive_hash(data: bytes, version: str) -> None: + """Raise SyncthingError if *data* (the downloaded archive bytes) does + not match the official Syncthing sha256sum.txt.asc entry for this + platform. Falls back to a logged warning (not an error) if the + official sums file cannot be fetched or the platform entry is absent. + """ try: - os_name, arch, _ext = _platform_archive_info() + archive_name = _archive_filename(version) except SyncthingError: - log.warning("Cannot determine platform for binary hash verification; skipping") + log.warning("Cannot determine platform for archive hash verification; skipping") return - key = (os_name, arch, version if version.startswith("v") else f"v{version}") - expected = _KNOWN_BINARY_HASHES.get(key) + sums = _fetch_official_sha256sums(version) + expected = sums.get(archive_name) if expected is None: - log.warning("No known hash for %s %s %s; skipping verification", *key) + log.warning( + "No hash for %s in sha256sum.txt.asc; skipping archive verification", + archive_name, + ) return import hashlib - h = hashlib.sha256(binary.read_bytes()).hexdigest() - if h != expected: - binary.unlink(missing_ok=True) + actual = hashlib.sha256(data).hexdigest() + if actual != expected: raise SyncthingError( - f"Syncthing binary hash mismatch for {key}: expected {expected}, got {h}. " - f"The binary has been deleted. Please retry." + f"Syncthing archive hash mismatch for {archive_name}: " + f"expected {expected}, got {actual}. Refusing to extract." ) - log.info("Syncthing binary hash verified for %s", key) + log.info("Syncthing archive hash verified for %s", archive_name) def _download(url: str) -> bytes: @@ -191,8 +229,8 @@ def ensure_binary(version: str = config.SYNCTHING_VERSION) -> Path: data = _download(url) except URLError as exc: raise SyncthingError(f"Failed to download Syncthing: {exc}") from exc + _verify_archive_hash(data, version) extracted = _extract_binary(data, ext, config.SYNCTHING_BIN_DIR) - _verify_binary_hash(extracted, version) log.info("Installed syncthing binary at %s", extracted) return extracted @@ -766,6 +804,11 @@ def _pump_output(self, proc: subprocess.Popen) -> None: which is useless for diagnosing startup failures (DB locked, port bound, bad config, etc.). The reader must keep draining the pipe or syncthing will eventually block on write. + + Output is demoted to DEBUG to keep clipsync's own INFO stream + readable; syncthing is verbose and its lines are not level-tagged. + The monitor thread still logs the exit code at ERROR when the + process actually dies, so real failures remain visible. """ stream = proc.stdout if stream is None: @@ -774,7 +817,7 @@ def _pump_output(self, proc: subprocess.Popen) -> None: for line in iter(stream.readline, ""): line = line.rstrip() if line: - log.info("syncthing: %s", line) + log.debug("syncthing: %s", line) except Exception: log.debug("Syncthing output pump error", exc_info=True) finally: diff --git a/clipsync/ui.py b/clipsync/ui.py index f3ae472..81f33ce 100644 --- a/clipsync/ui.py +++ b/clipsync/ui.py @@ -16,6 +16,7 @@ import threading from collections.abc import Callable from pathlib import Path +from typing import Any, cast import customtkinter as ctk import requests @@ -73,7 +74,10 @@ def open(self, window: str) -> None: # of silently swallowed. Open in binary-append so it doesn't race # with the parent's RotatingFileHandler. try: - stderr_fh = open(config.LOG_FILE, "ab") + # Deliberately not a context manager: the handle must outlive + # this function for the subprocess's stderr; it's closed + # explicitly below once Popen has its own reference. + stderr_fh = open(config.LOG_FILE, "ab") # noqa: SIM115 except OSError: stderr_fh = subprocess.DEVNULL # type: ignore[assignment] kwargs: dict = dict( @@ -95,6 +99,7 @@ def open(self, window: str) -> None: # process that wasn't directly activated by user input. try: import ctypes + ctypes.windll.user32.AllowSetForegroundWindow(proc.pid) # type: ignore[attr-defined] except Exception: pass @@ -521,7 +526,10 @@ def _on_frame(self, frame: object) -> None: except ImportError: return try: - rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # frame is `object` at this boundary (the callback interface + # doesn't commit to a cv2/numpy type); narrow it here since this + # is the one place that actually knows it came from cv2.VideoCapture. + rgb = cv2.cvtColor(cast(Any, frame), cv2.COLOR_BGR2RGB) h, w = rgb.shape[:2] target_w, target_h = self._preview_size scale = min(target_w / w, target_h / h) diff --git a/pyproject.toml b/pyproject.toml index 5fe41b8..d764f32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,54 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "offbyonebit-clipsync" +dynamic = ["version"] +description = "Peer-to-peer clipboard sync over Syncthing." +readme = "README.md" +license = { file = "LICENSE" } +requires-python = ">=3.11" +authors = [{ name = "offbyonebit" }] +classifiers = [ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Operating System :: Microsoft :: Windows", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Topic :: Communications :: File Sharing", +] +dependencies = [ + "customtkinter>=5.2.2", + "pystray>=0.19.5", + "pyperclip>=1.9.0", + "qrcode[pil]>=7.4.2", + "Pillow>=10.0.0", + "watchdog>=4.0.0", + "requests>=2.31.0", + "opencv-python>=4.9.0.80", + "cryptography>=41.0.0", + "pyobjc-core>=10.0; sys_platform == 'darwin'", + "pyobjc-framework-Cocoa>=10.0; sys_platform == 'darwin'", +] + +[project.scripts] +clipsync = "clipsync.main:main" + +[project.urls] +Homepage = "https://github.com/offbyonebit/clipsync" +Source = "https://github.com/offbyonebit/clipsync" +Issues = "https://github.com/offbyonebit/clipsync/issues" + +[tool.setuptools.dynamic] +version = { attr = "clipsync.__version__" } + +[tool.setuptools.packages.find] +include = ["clipsync*"] + [tool.ruff] line-length = 120 target-version = "py311" diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..db1be50 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,6 @@ +# Build / dev tooling. Install with: pip install -r requirements-dev.txt +-r requirements.txt +pyinstaller>=6.3.0 +ruff>=0.6.0 +mypy>=1.10 +pytest>=8.0 diff --git a/requirements.txt b/requirements.txt index 2aea016..9da088e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,6 @@ Pillow>=10.0.0 watchdog>=4.0.0 requests>=2.31.0 opencv-python>=4.9.0.80 -pyinstaller>=6.3.0 cryptography>=41.0.0 pyobjc-core>=10.0; sys_platform == "darwin" pyobjc-framework-Cocoa>=10.0; sys_platform == "darwin" diff --git a/tests/test_cross_os_sync.py b/tests/test_cross_os_sync.py index df53079..1b4893c 100644 --- a/tests/test_cross_os_sync.py +++ b/tests/test_cross_os_sync.py @@ -84,6 +84,16 @@ def make_pair(tmp_path, monkeypatch): (os_a, os_b) pair sharing one sync folder.""" monkeypatch.setattr(config, "CLIPBOARD_POLL_INTERVAL", POLL) monkeypatch.setattr(clipboard_module, "Observer", PollingObserver) + # Force the polling OUT loop regardless of the host's own display state. + # On a real X11 desktop, start() opportunistically wires up a *real* + # XFixes watcher tied to the actual system clipboard; the OUT loop then + # waits on real selection-owner-change events instead of polling, so + # this test's fake clipboard.set() (which never touches the real X + # server) would never trigger a sync. Headless CI has no DISPLAY so it + # falls back to polling naturally, but that shouldn't be required to + # pass on a Linux dev machine with a graphical session. + monkeypatch.setenv("CLIPSYNC_NO_XFIXES", "1") + monkeypatch.setenv("CLIPSYNC_NO_XLIB", "1") syncs: list[ClipboardSync] = [] diff --git a/tests/test_crypto.py b/tests/test_crypto.py new file mode 100644 index 0000000..bf7e317 --- /dev/null +++ b/tests/test_crypto.py @@ -0,0 +1,97 @@ +"""Tests for the Fernet-based clipboard encryption helpers.""" + +from __future__ import annotations + +import pytest + +from clipsync import crypto + + +def test_roundtrip_v1_payload() -> None: + payload = b"hello clipboard" + token = crypto.encrypt(payload, "correct horse battery staple") + assert crypto.is_encrypted(token) + assert crypto.decrypt(token, "correct horse battery staple") == payload + + +def test_roundtrip_empty_payload() -> None: + payload = b"" + token = crypto.encrypt(payload, "pw") + assert crypto.decrypt(token, "pw") == payload + + +def test_roundtrip_binary_payload() -> None: + payload = bytes(range(256)) + token = crypto.encrypt(payload, "pw") + assert crypto.decrypt(token, "pw") == payload + + +def test_decrypt_wrong_passphrase_returns_none() -> None: + token = crypto.encrypt(b"secret", "right") + assert crypto.decrypt(token, "wrong") is None + + +def test_decrypt_corrupted_payload_returns_none() -> None: + token = crypto.encrypt(b"secret", "pw") + # Flip a byte in the body. + corrupted = token[:-1] + bytes([token[-1] ^ 0xFF]) + assert crypto.decrypt(corrupted, "pw") is None + + +def test_decrypt_garbage_returns_none() -> None: + assert crypto.decrypt(b"not a csenc payload", "pw") is None + + +def test_decrypt_truncated_v1_payload_returns_none() -> None: + # Header + partial salt but no body. + truncated = crypto._ENC_MAGIC_V1 + b"\x00\x01" + assert crypto.decrypt(truncated, "pw") is None + + +def test_is_encrypted_detects_v0_and_v1() -> None: + v1 = crypto.encrypt(b"x", "pw") + v0 = crypto._ENC_MAGIC_V0 + b"legacy-token-bytes" + assert crypto.is_encrypted(v1) + assert crypto.is_encrypted(v0) + assert not crypto.is_encrypted(b"plain text") + assert not crypto.is_encrypted(b"") + + +def test_each_encrypt_uses_random_salt() -> None: + """Two encrypt() calls with the same input must produce different ciphertext.""" + a = crypto.encrypt(b"same", "pw") + b = crypto.encrypt(b"same", "pw") + assert a != b + # Both must still decrypt to the same plaintext. + assert crypto.decrypt(a, "pw") == b"same" + assert crypto.decrypt(b, "pw") == b"same" + + +def test_v0_legacy_payload_decrypts() -> None: + """v0 used a hardcoded salt; current code must still read those payloads.""" + # We can't call crypto.encrypt() to make a v0 (it always emits v1), so + # construct one by hand: magic + Fernet token derived with the legacy salt. + import base64 + + from cryptography.fernet import Fernet + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + + kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=crypto._LEGACY_SALT, iterations=120_000) + key = base64.urlsafe_b64encode(kdf.derive(b"pw")) + token = Fernet(key).encrypt(b"legacy data") + v0_payload = crypto._ENC_MAGIC_V0 + token + + assert crypto.is_encrypted(v0_payload) + assert crypto.decrypt(v0_payload, "pw") == b"legacy data" + # A v0 payload encrypted with "pw" must not decrypt with a different passphrase. + assert crypto.decrypt(v0_payload, "other") is None + + +@pytest.mark.parametrize("passphrase", ["", " ", "p", "long " * 100, "ünïcödé"]) +def test_roundtrip_various_passphrases(passphrase: str) -> None: + payload = b"x" + # An empty passphrase is the "no encryption" path elsewhere; encrypt() + # still has to work if asked directly. + token = crypto.encrypt(payload, passphrase) + assert crypto.decrypt(token, passphrase) == payload diff --git a/tests/test_history.py b/tests/test_history.py new file mode 100644 index 0000000..46715e6 --- /dev/null +++ b/tests/test_history.py @@ -0,0 +1,192 @@ +"""Tests for the clipboard history manager.""" + +from __future__ import annotations + +import time + +import pytest + +from clipsync import config +from clipsync.history import ClipboardHistory, HistoryEntry + + +@pytest.fixture +def settings(tmp_path, monkeypatch) -> config.Settings: + """Settings pointed at a tmp_path so history.json lands there.""" + monkeypatch.setattr(config, "HISTORY_FILE", tmp_path / "clipsync_history.json") + s = config.Settings(path=tmp_path / "settings.json") + return s + + +def _make_settings_with(**overrides) -> config.Settings: + """Build a Settings-like stub without touching disk for defaults.""" + + values = { + "encryption_passphrase": "", + "history_enabled": True, + "history_max_items": 50, + "history_auto_clear_minutes": 0, + } + values.update(overrides) + + class _Stub: + def get(self, key, default=None): + return values.get(key, default) + + return _Stub() # type: ignore[return-value] + + +def test_add_and_retrieve_entries(settings) -> None: + h = ClipboardHistory(settings) + h.add_entry("first", "local") + h.add_entry("second", "remote") + entries = h.get_entries() + assert [e.text for e in entries] == ["first", "second"] + assert entries[0].source == "local" + assert entries[1].source == "remote" + + +def test_add_entry_dedups_consecutive_duplicates(settings) -> None: + h = ClipboardHistory(settings) + h.add_entry("same", "local") + h.add_entry("same", "local") + h.add_entry("same", "remote") # different source but same normalized text + h.add_entry("different", "local") + entries = h.get_entries() + assert [e.text for e in entries] == ["same", "different"] + + +def test_add_entry_dedups_after_newline_normalization(settings) -> None: + """CRLF and LF should compare equal for dedup purposes.""" + h = ClipboardHistory(settings) + h.add_entry("line1\nline2", "local") + h.add_entry("line1\r\nline2", "local") + entries = h.get_entries() + assert len(entries) == 1 + + +def test_add_entry_ignores_empty_text(settings) -> None: + h = ClipboardHistory(settings) + h.add_entry("", "local") + assert h.get_entries() == [] + + +def test_max_items_prunes_oldest() -> None: + s = _make_settings_with(history_max_items=3) + h = ClipboardHistory(s) + for i in range(5): + h.add_entry(f"item-{i}", "local") + entries = h.get_entries() + assert [e.text for e in entries] == ["item-2", "item-3", "item-4"] + + +def test_set_max_items_trims_existing(settings) -> None: + h = ClipboardHistory(settings) + for i in range(10): + h.add_entry(f"item-{i}", "local") + h.set_max_items(3) + entries = h.get_entries() + assert len(entries) == 3 + assert entries[-1].text == "item-9" + + +def test_clear_empties_history(settings) -> None: + h = ClipboardHistory(settings) + h.add_entry("a", "local") + h.add_entry("b", "local") + h.clear() + assert h.get_entries() == [] + + +def test_persistence_roundtrip(tmp_path, monkeypatch) -> None: + monkeypatch.setattr(config, "HISTORY_FILE", tmp_path / "clipsync_history.json") + s = config.Settings(path=tmp_path / "settings.json") + h = ClipboardHistory(s) + h.add_entry("persisted", "local") + + # New instance reads from disk. + h2 = ClipboardHistory(s) + entries = h2.get_entries() + assert len(entries) == 1 + assert entries[0].text == "persisted" + + +def test_encrypted_persistence_roundtrip(tmp_path, monkeypatch) -> None: + """With a passphrase set, the history file must be encrypted at rest.""" + monkeypatch.setattr(config, "HISTORY_FILE", tmp_path / "clipsync_history.json") + s = config.Settings(path=tmp_path / "settings.json") + s.set("encryption_passphrase", "secret-pw") + + h = ClipboardHistory(s) + h.add_entry("secret clipboard", "local") + + raw = config.HISTORY_FILE.read_bytes() + # Must start with the CSENC magic header — not plaintext JSON. + assert raw.startswith(b"CSENC"), "history file should be encrypted when a passphrase is set" + assert b"secret clipboard" not in raw, "plaintext must not appear in encrypted history file" + + # New instance with the right passphrase must decrypt and load. + h2 = ClipboardHistory(s) + entries = h2.get_entries() + assert len(entries) == 1 + assert entries[0].text == "secret clipboard" + + +def test_encrypted_history_wrong_passphrase_is_logged_not_raised(tmp_path, monkeypatch, caplog) -> None: + monkeypatch.setattr(config, "HISTORY_FILE", tmp_path / "clipsync_history.json") + s = config.Settings(path=tmp_path / "settings.json") + s.set("encryption_passphrase", "right") + + ClipboardHistory(s).add_entry("data", "local") + + # Now reload with a different passphrase. + s.set("encryption_passphrase", "wrong") + with caplog.at_level("WARNING"): + h2 = ClipboardHistory(s) + assert h2.get_entries() == [] + assert any("decrypt" in r.message.lower() for r in caplog.records) + + +def test_auto_clear_prunes_old_entries() -> None: + now = time.time() + s = _make_settings_with(history_auto_clear_minutes=5) + h = ClipboardHistory(s) + # Inject an old entry that should be pruned (older than the 5-minute window). + h._entries = [ + HistoryEntry(text="old", timestamp=now - 600, source="local"), + HistoryEntry(text="recent", timestamp=now - 10, source="local"), + ] + h._prune_old() + entries = h.get_entries() + assert [e.text for e in entries] == ["recent"] + + +def test_auto_clear_zero_keeps_everything() -> None: + s = _make_settings_with(history_auto_clear_minutes=0) + h = ClipboardHistory(s) + h._entries = [ + HistoryEntry(text="ancient", timestamp=0.0, source="local"), + ] + h._prune_old() + assert len(h.get_entries()) == 1 + + +def test_disabled_history_does_not_persist(tmp_path, monkeypatch) -> None: + monkeypatch.setattr(config, "HISTORY_FILE", tmp_path / "clipsync_history.json") + s = config.Settings(path=tmp_path / "settings.json") + s.set("history_enabled", False) + + h = ClipboardHistory(s) + h.add_entry("ignored", "local") + # History file should not have been created with payload. + assert not config.HISTORY_FILE.exists() or config.HISTORY_FILE.read_bytes() == b"" + + +def test_history_entry_roundtrip_dict() -> None: + entry = HistoryEntry(text="x", timestamp=1.5, source="remote") + d = entry.to_dict() + restored = HistoryEntry.from_dict(d) + assert restored == entry + # Default source is "local" when missing from dict. + partial = {"text": "y", "timestamp": 2.0} + assert HistoryEntry.from_dict(partial).source == "local" diff --git a/tests/test_image_sync.py b/tests/test_image_sync.py index 18dc139..42e07a8 100644 --- a/tests/test_image_sync.py +++ b/tests/test_image_sync.py @@ -60,6 +60,10 @@ def _wait_for(predicate, timeout: float = 5.0, interval: float = 0.05) -> bool: def two_sided(tmp_path, monkeypatch): monkeypatch.setattr(config, "CLIPBOARD_POLL_INTERVAL", POLL) monkeypatch.setattr(clipboard_module, "Observer", PollingObserver) + # Force the polling OUT loop; see test_cross_os_sync.py::make_pair for why + # a real X11 desktop session would otherwise bypass this test's fakes. + monkeypatch.setenv("CLIPSYNC_NO_XFIXES", "1") + monkeypatch.setenv("CLIPSYNC_NO_XLIB", "1") sync_folder = tmp_path / "shared_sync" sync_folder.mkdir() diff --git a/tests/test_linux_paste_freeze.py b/tests/test_linux_paste_freeze.py index 4589360..e3ecaa1 100644 --- a/tests/test_linux_paste_freeze.py +++ b/tests/test_linux_paste_freeze.py @@ -28,7 +28,6 @@ import sys import threading import time -import types import unittest.mock as mock import pytest @@ -38,8 +37,8 @@ import clipsync.clipboard as clipboard_module from clipsync import config from clipsync.clipboard import ( - ClipboardSync, _STOP_SENTINEL, + ClipboardSync, _read_image_from_system_clipboard, ) @@ -320,6 +319,12 @@ def two_sided_linux(tmp_path, monkeypatch): monkeypatch.setattr(config, "CLIPBOARD_POLL_INTERVAL", POLL) monkeypatch.setattr(clipboard_module, "Observer", PollingObserver) monkeypatch.setattr(sys, "platform", "linux") + # Force the polling fallback this fixture is meant to simulate. On a + # real X11 desktop, start() would otherwise wire up a genuine XFixes + # watcher (since sys.platform == "linux" here) and take the event-driven + # path instead, defeating the point of this fixture. + monkeypatch.setenv("CLIPSYNC_NO_XFIXES", "1") + monkeypatch.setenv("CLIPSYNC_NO_XLIB", "1") sync_folder = tmp_path / "shared_sync" sync_folder.mkdir() @@ -369,6 +374,4 @@ def test_text_syncs_on_linux_polling_fallback(two_sided_linux) -> None: """Text sync must work normally alongside image checks on the polling path.""" _, _, txt_a, _, txt_b, _ = two_sided_linux txt_a["value"] = "paste freeze fixed" - assert _wait_for(lambda: txt_b["value"] == "paste freeze fixed"), ( - f"Text did not sync; got {txt_b['value']!r}" - ) + assert _wait_for(lambda: txt_b["value"] == "paste freeze fixed"), f"Text did not sync; got {txt_b['value']!r}" diff --git a/tests/test_mac_windows_sync.py b/tests/test_mac_windows_sync.py index 93506a7..9e4d3c6 100644 --- a/tests/test_mac_windows_sync.py +++ b/tests/test_mac_windows_sync.py @@ -67,6 +67,10 @@ def two_sided(tmp_path, monkeypatch): # which would only happen in this test harness (two logical machines # share one real folder). Swap in PollingObserver to sidestep that. monkeypatch.setattr(clipboard_module, "Observer", PollingObserver) + # Force the polling OUT loop; see test_cross_os_sync.py::make_pair for why + # a real X11 desktop session would otherwise bypass this test's fakes. + monkeypatch.setenv("CLIPSYNC_NO_XFIXES", "1") + monkeypatch.setenv("CLIPSYNC_NO_XLIB", "1") sync_folder = tmp_path / "shared_sync" sync_folder.mkdir() diff --git a/tests/test_pairing.py b/tests/test_pairing.py new file mode 100644 index 0000000..5861784 --- /dev/null +++ b/tests/test_pairing.py @@ -0,0 +1,88 @@ +"""Tests for device ID normalization with Luhn checksum validation.""" + +from __future__ import annotations + +import pytest + +from clipsync import pairing + +# A real device ID derived from a generated cert.pem (valid Luhn checksums). +_VALID_DEVICE_ID = "T2ZW3LO-EJ72MIJ-XGQT2F6-XVHZDDQ-N2NA2O3-XIMRQLU-46PDBL4-TBHO5AA" + + +def test_normalize_valid_device_id() -> None: + assert pairing.normalize_device_id(_VALID_DEVICE_ID) == _VALID_DEVICE_ID + + +def test_normalize_strips_whitespace_and_uppercases() -> None: + assert pairing.normalize_device_id(" " + _VALID_DEVICE_ID.lower() + " ") == _VALID_DEVICE_ID + + +def test_normalize_handles_internal_whitespace() -> None: + spaced = " ".join(_VALID_DEVICE_ID) + assert pairing.normalize_device_id(spaced) == _VALID_DEVICE_ID + + +def test_normalize_rejects_empty() -> None: + assert pairing.normalize_device_id("") is None + assert pairing.normalize_device_id(None) is None # type: ignore[arg-type] + + +def test_normalize_rejects_too_short() -> None: + assert pairing.normalize_device_id("ABCDEFG-ABCDEFG") is None + + +def test_normalize_rejects_invalid_chars() -> None: + # Base32 alphabet is A-Z2-7; 1, 0, 8, 9 are not valid. + bad = _VALID_DEVICE_ID.replace("T", "1") + assert pairing.normalize_device_id(bad) is None + + +def test_normalize_rejects_typo_with_bad_luhn_checksum() -> None: + """Same shape, valid base32, but the Luhn check character is wrong. + + Without checksum validation this would slip through to Syncthing + and be rejected later with a vaguer error. + """ + # Flip the last char to break the final Luhn check. + last_char = _VALID_DEVICE_ID[-1] + bad_char = "B" if last_char != "B" else "C" + typo = _VALID_DEVICE_ID[:-1] + bad_char + assert pairing.normalize_device_id(typo) is None + + +def test_normalize_rejects_typo_in_data_block() -> None: + """A typo in any of the 13-char data blocks breaks its Luhn check.""" + # Mutate a middle character (not the check char) of the first block. + typo = "A" + _VALID_DEVICE_ID[1:] + assert pairing.normalize_device_id(typo) is None + + +@pytest.mark.parametrize("index", [0, 1, 2, 3]) +def test_validate_luhn_checksums_each_block(index: int) -> None: + """Corrupting any of the 4 blocks must fail validation.""" + raw = _VALID_DEVICE_ID.replace("-", "") + # Each block is 14 chars: 13 data + 1 check. Corrupt the check char. + block_start = index * 14 + 13 + original_check = raw[block_start] + # Pick a different valid base32 char as the corrupted check char. + new_char = next(c for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567" if c != original_check) + corrupted = raw[:block_start] + new_char + raw[block_start + 1 :] + corrupted_id = "-".join(corrupted[i : i + 7] for i in range(0, 56, 7)) + assert pairing.normalize_device_id(corrupted_id) is None + + +def test_pair_with_device_rejects_invalid_id() -> None: + from clipsync.syncthing import SyncthingClient + + client = SyncthingClient(api_key="x", base_url="http://127.0.0.1:0") + with pytest.raises(ValueError): + pairing.pair_with_device(client, "not-a-device-id") + + +def test_accept_pending_device_rejects_invalid_id() -> None: + from clipsync.syncthing import SyncthingClient + + client = SyncthingClient(api_key="x", base_url="http://127.0.0.1:0") + with pytest.raises(ValueError): + pairing.accept_pending_device(client, "not-a-device-id") diff --git a/tests/test_settings_reload.py b/tests/test_settings_reload.py index 55e398b..555d3a4 100644 --- a/tests/test_settings_reload.py +++ b/tests/test_settings_reload.py @@ -65,3 +65,120 @@ def counting_reload() -> None: settings.get("encryption_passphrase") assert calls["count"] == 0, "get() should not call reload() when mtime is unchanged" + + +def test_corrupted_settings_file_is_not_clobbered(tmp_path) -> None: + """A corrupted settings.json must NOT be overwritten with defaults. + + Previously a JSONDecodeError fell through to _persist_locked(), which + silently destroyed the user's file. The fix returns early so the + user can recover the file manually. + """ + path = tmp_path / "settings.json" + path.write_text("{ this is not valid json") + original_bytes = path.read_bytes() + + config.Settings(path=path) # must not raise, must not overwrite + + assert path.read_bytes() == original_bytes, "corrupted settings file was clobbered" + + +def test_corrupted_settings_falls_back_to_defaults_in_memory(tmp_path) -> None: + path = tmp_path / "settings.json" + path.write_text("{ broken") + + settings = config.Settings(path=path) + # In-memory state is defaults. api_key stays empty here; prepare_home() + # regenerates one on demand the next time Syncthing starts. + assert settings.get("sync_paused") is False + assert settings.get("show_notifications") is True + assert isinstance(settings.get("api_key"), str) + + +def test_load_does_not_persist_when_file_already_complete(tmp_path, monkeypatch) -> None: + """A settings.json that already contains every default key must not be + rewritten on every startup. We track writes via os.replace.""" + path = tmp_path / "settings.json" + settings = config.Settings(path=path) # writes once on creation + initial_mtime_ns = path.stat().st_mtime_ns + + # Reset mtime cache to force the next _load to actually stat the file. + settings._mtime_ns = 0 + settings._load() + + try: + final_mtime_ns = path.stat().st_mtime_ns + except OSError: + return + assert final_mtime_ns == initial_mtime_ns, "settings.json was needlessly rewritten on load" + + +def test_load_does_not_overwrite_when_external_writer_changes_a_value(tmp_path) -> None: + """If another process rewrites settings.json with a different value but + the same set of keys, _load() must pick up the new value in memory but + must NOT persist (which would needlessly rewrite the file on every + startup and could race with the external writer).""" + path = tmp_path / "settings.json" + settings = config.Settings(path=path) + + # Simulate a UI subprocess writing the file with a different value + # but the same complete set of keys. + data = json.loads(path.read_text()) + data["sync_paused"] = True + import time as _time + + _time.sleep(0.01) + path.write_text(json.dumps(data)) + mtime_after_external_write = path.stat().st_mtime_ns + + settings._mtime_ns = 0 # force re-read on next _load + settings._load() + + # In-memory state reflects the external write... + assert settings.get("sync_paused") is True + # ...and we did not rewrite the file (mtime unchanged). + assert path.stat().st_mtime_ns == mtime_after_external_write + + +def test_load_persists_when_default_key_missing_from_disk(tmp_path) -> None: + """An older settings.json missing a key that was added to DEFAULT_SETTINGS + in a later release must be repaired on disk (so the file stays complete + for any external reader that doesn't merge defaults).""" + path = tmp_path / "settings.json" + settings = config.Settings(path=path) + + data = json.loads(path.read_text()) + del data["show_notifications"] # simulate an upgrade from an older release + path.write_text(json.dumps(data)) + + settings._mtime_ns = 0 + settings._load() + + persisted = json.loads(path.read_text()) + assert "show_notifications" in persisted + assert persisted["show_notifications"] is True + assert settings.get("show_notifications") is True + + +def test_load_persists_when_api_key_missing_from_disk(tmp_path) -> None: + """A settings.json missing the api_key must be repaired (one generated + and written back) so Syncthing can talk to its own REST API.""" + path = tmp_path / "settings.json" + path.write_text(json.dumps({"sync_paused": True, "show_notifications": False})) + + settings = config.Settings(path=path) + assert settings.get("api_key") != "" + persisted = json.loads(path.read_text()) + assert persisted["api_key"] == settings.get("api_key") + + +def test_load_handles_non_object_json_without_clobbering(tmp_path) -> None: + """A settings.json that is valid JSON but not an object (e.g. an array) + must not be clobbered either; the same recovery principle applies.""" + path = tmp_path / "settings.json" + original_bytes = b'["not", "an", "object"]' + path.write_bytes(original_bytes) + + settings = config.Settings(path=path) + assert path.read_bytes() == original_bytes + assert settings.get("sync_paused") is False # defaults in memory diff --git a/tests/test_syncthing_device_id.py b/tests/test_syncthing_device_id.py new file mode 100644 index 0000000..a864b05 --- /dev/null +++ b/tests/test_syncthing_device_id.py @@ -0,0 +1,113 @@ +"""Tests for the Syncthing device-ID derivation and Luhn mod-32 checksum.""" + +from __future__ import annotations + +import base64 +import hashlib +from datetime import UTC, datetime, timedelta + +import pytest + +from clipsync.syncthing import _LUHN_BASE32, _device_id_from_cert, _luhn32 + + +def test_luhn32_known_values() -> None: + # Hand-verified values from Syncthing's reference implementation. + assert _luhn32("AAAAAAAAAAAAA") == "A" + assert _luhn32("ABCDEFGHIJKLM") == "O" + + +def test_luhn32_returns_valid_base32_char() -> None: + # Every output must be a single character from the base32 alphabet. + for c in _LUHN_BASE32: + # Build a 13-char data string from the same char to keep it stable. + result = _luhn32(c * 13) + assert len(result) == 1 + assert result in _LUHN_BASE32 + + +def test_luhn32_changes_with_input() -> None: + # Two different inputs must not always produce the same check char. + results = {_luhn32(_LUHN_BASE32[i] * 13) for i in range(len(_LUHN_BASE32))} + assert len(results) > 1 + + +def _generate_self_signed_cert(tmp_path): + """Generate a self-signed cert/key pair and return the cert path.""" + from cryptography import x509 + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.x509.oid import NameOID + + key = ec.generate_private_key(ec.SECP256R1()) + subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "clipsync-test")]) + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(UTC) + timedelta(days=1)) + .sign(key, hashes.SHA256()) + ) + tmp_path.mkdir(parents=True, exist_ok=True) + cert_path = tmp_path / "cert.pem" + cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM)) + return cert_path, cert + + +def test_device_id_from_cert_matches_syncthing_algorithm(tmp_path) -> None: + """The derived device ID must match Syncthing's documented algorithm: + + SHA-256 of the DER-encoded cert -> base32 (no padding) -> split into + 4 x 13 chars, append a Luhn mod-32 check char to each -> split into + 8 groups of 7 with hyphens. + """ + cert_path, cert = _generate_self_signed_cert(tmp_path) + + from cryptography.hazmat.primitives.serialization import Encoding + + der = cert.public_bytes(Encoding.DER) + digest = hashlib.sha256(der).digest() + b32 = base64.b32encode(digest).decode("ascii").rstrip("=") + assert len(b32) == 52 + + expected_chunks = [b32[i : i + 13] for i in range(0, 52, 13)] + expected_with_checks = "".join(c + _luhn32(c) for c in expected_chunks) + expected_id = "-".join(expected_with_checks[i : i + 7] for i in range(0, 56, 7)) + + actual = _device_id_from_cert(cert_path) + assert actual == expected_id + + +def test_device_id_from_cert_has_correct_shape(tmp_path) -> None: + cert_path, _ = _generate_self_signed_cert(tmp_path) + device_id = _device_id_from_cert(cert_path) + # 8 groups of 7 base32 chars separated by hyphens = 56 chars + 7 hyphens. + assert len(device_id) == 63 + assert device_id.count("-") == 7 + raw = device_id.replace("-", "") + assert len(raw) == 56 + assert all(c in _LUHN_BASE32 for c in raw) + + +def test_device_id_from_cert_validates_under_pairing_normalize(tmp_path) -> None: + """A derived device ID must pass our own normalize_device_id check + (which includes the Luhn checksum validation).""" + from clipsync.pairing import normalize_device_id + + cert_path, _ = _generate_self_signed_cert(tmp_path) + device_id = _device_id_from_cert(cert_path) + assert normalize_device_id(device_id) == device_id + + +def test_device_id_from_cert_distinct_for_distinct_certs(tmp_path) -> None: + a, _ = _generate_self_signed_cert(tmp_path / "a") + b, _ = _generate_self_signed_cert(tmp_path / "b") + assert _device_id_from_cert(a) != _device_id_from_cert(b) + + +def test_device_id_from_cert_raises_on_missing_file(tmp_path) -> None: + with pytest.raises(OSError): + _device_id_from_cert(tmp_path / "nonexistent.pem") diff --git a/tests/test_syncthing_hash.py b/tests/test_syncthing_hash.py new file mode 100644 index 0000000..7dd4ad0 --- /dev/null +++ b/tests/test_syncthing_hash.py @@ -0,0 +1,122 @@ +"""Tests for the Syncthing archive hash verification. + +Uses mocked network calls so the tests are hermetic; the real fetch +logic is exercised end-to-end in the integration tests. +""" + +from __future__ import annotations + +import hashlib +from unittest.mock import patch + +import pytest + +from clipsync import syncthing +from clipsync.syncthing import SyncthingError + +_SAMPLE_SHA256_CONTENT = ( + "-----BEGIN PGP SIGNED MESSAGE-----\n" + "Hash: SHA256\n" + "\n" + "d5ca379993844b0e6e4fced05e3ac4a6c4513dee916ab65516c6d07d5e53e317 syncthing-linux-amd64-v2.0.16.tar.gz\n" + "2b5fe419de35c26354843ae567b2ae5c1bf82b151e3aea3dcfb620ca590999d4 syncthing-macos-amd64-v2.0.16.zip\n" + "5b519408c11e69e712702911caa399077e3fc602d8d70d6147f620e67bd83037 syncthing-windows-amd64-v2.0.16.zip\n" + "-----BEGIN PGP SIGNATURE-----\n" + "irrelevant\n" + "-----END PGP SIGNATURE-----\n" +) + + +def test_fetch_official_sha256sums_parses_pgp_wrapped_file() -> None: + with patch("clipsync.syncthing._download", return_value=_SAMPLE_SHA256_CONTENT.encode("ascii")): + sums = syncthing._fetch_official_sha256sums("v2.0.16") + assert sums == { + "syncthing-linux-amd64-v2.0.16.tar.gz": "d5ca379993844b0e6e4fced05e3ac4a6c4513dee916ab65516c6d07d5e53e317", + "syncthing-macos-amd64-v2.0.16.zip": "2b5fe419de35c26354843ae567b2ae5c1bf82b151e3aea3dcfb620ca590999d4", + "syncthing-windows-amd64-v2.0.16.zip": "5b519408c11e69e712702911caa399077e3fc602d8d70d6147f620e67bd83037", + } + + +def test_fetch_official_sha256sums_normalizes_v_prefix() -> None: + """Version may be passed with or without the leading 'v'.""" + seen_urls = [] + + def fake_download(url: str) -> bytes: + seen_urls.append(url) + return _SAMPLE_SHA256_CONTENT.encode("ascii") + + with patch("clipsync.syncthing._download", side_effect=fake_download): + syncthing._fetch_official_sha256sums("2.0.16") + assert seen_urls == ["https://github.com/syncthing/syncthing/releases/download/v2.0.16/sha256sum.txt.asc"] + + +def test_fetch_official_sha256sums_returns_empty_on_network_error() -> None: + from urllib.error import URLError + + with patch("clipsync.syncthing._download", side_effect=URLError("offline")): + sums = syncthing._fetch_official_sha256sums("v2.0.16") + assert sums == {} + + +def test_fetch_official_sha256sums_ignores_malformed_lines() -> None: + bad_content = ( + "-----BEGIN PGP SIGNED MESSAGE-----\n" + "Hash: SHA256\n" + "\n" + "not a hash line\n" + "abc123 short-hash\n" + "d5ca379993844b0e6e4fced05e3ac4a6c4513dee916ab65516c6d07d5e53e317 syncthing-linux-amd64-v2.0.16.tar.gz\n" + ) + with patch("clipsync.syncthing._download", return_value=bad_content.encode("ascii")): + sums = syncthing._fetch_official_sha256sums("v2.0.16") + assert sums == { + "syncthing-linux-amd64-v2.0.16.tar.gz": "d5ca379993844b0e6e4fced05e3ac4a6c4513dee916ab65516c6d07d5e53e317", + } + + +def test_verify_archive_hash_succeeds_on_match(monkeypatch) -> None: + archive = b"the bytes of a syncthing archive" + expected = hashlib.sha256(archive).hexdigest() + name = "syncthing-linux-amd64-v2.0.16.tar.gz" + + monkeypatch.setattr(syncthing, "_fetch_official_sha256sums", lambda _v: {name: expected}) + # Should not raise. + syncthing._verify_archive_hash(archive, "v2.0.16") + + +def test_verify_archive_hash_raises_on_mismatch(monkeypatch) -> None: + archive = b"the bytes of a syncthing archive" + name = "syncthing-linux-amd64-v2.0.16.tar.gz" + + monkeypatch.setattr(syncthing, "_fetch_official_sha256sums", lambda _v: {name: "0" * 64}) + with pytest.raises(SyncthingError, match="hash mismatch"): + syncthing._verify_archive_hash(archive, "v2.0.16") + + +def test_verify_archive_hash_skips_when_platform_absent(monkeypatch) -> None: + """If sha256sum.txt has no entry for our platform, verification must + be skipped (logged) rather than fail. Otherwise an unusual platform + would be unable to install even when Syncthing ships a binary for it.""" + archive = b"some bytes" + monkeypatch.setattr(syncthing, "_fetch_official_sha256sums", lambda _v: {}) + # Should not raise. + syncthing._verify_archive_hash(archive, "v2.0.16") + + +def test_verify_archive_hash_skips_when_fetch_fails(monkeypatch) -> None: + archive = b"some bytes" + monkeypatch.setattr(syncthing, "_fetch_official_sha256sums", lambda _v: {}) + syncthing._verify_archive_hash(archive, "v2.0.16") + + +def test_archive_filename_matches_release_naming() -> None: + name = syncthing._archive_filename("v2.0.16") + # Must match the asset names Syncthing actually publishes. + assert name in { + "syncthing-linux-amd64-v2.0.16.tar.gz", + "syncthing-macos-amd64-v2.0.16.zip", + "syncthing-windows-amd64-v2.0.16.zip", + "syncthing-linux-arm64-v2.0.16.tar.gz", + "syncthing-macos-arm64-v2.0.16.zip", + "syncthing-windows-arm64-v2.0.16.zip", + }