Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/build_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ jobs:
contents: read
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0 # needed for tags for dunamai
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v8.2.0
with:
enable-cache: true
- uses: actions/setup-python@v5
- uses: actions/setup-python@v6.2.0
with:
python-version: "3.10"
- name: Install dependencies
Expand All @@ -40,6 +40,7 @@ jobs:
uv run pytest --ignore=tests/extras
- name: Smoke Test Extras - Edgar and HTTP2
run: |
uv sync --group edgartest
uv run pytest tests/extras/test_edgartools.py tests/extras/test_http2.py
- name: Upload package to PyPI
if: startsWith(github.ref, 'refs/tags/v')
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ concurrency:
cancel-in-progress: true

env:
LATEST_PY_VERSION: &latest '3.13'
LATEST_PY_VERSION: &latest '3.14'
EDGAR_IDENTITY: "Iq De de_iq@iqmo.com"

jobs:
Expand All @@ -38,15 +38,15 @@ jobs:
contents: write
steps:
- name: Check out code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0
- uses: actions/setup-python@v5
- uses: actions/setup-python@v6.2.0
with:
python-version: ${{ matrix.python-version }}
allow-prereleases: true
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v8.2.0
with:
enable-cache: true
python-version: ${{ matrix.python-version }}
Expand All @@ -61,7 +61,7 @@ jobs:
- name: Smoke Test Extras - Edgar and HTTP2
if: ${{ matrix.python-version == env.LATEST_PY_VERSION && matrix.os == 'ubuntu-latest' }}
run: |
uv pip install edgartools h2
uv sync --group edgartest
uv run pytest tests/extras/test_edgartools.py tests/extras/test_http2.py
- name: Test Proxy
if: ${{ matrix.python-version == env.LATEST_PY_VERSION && matrix.os == 'ubuntu-latest' }}
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.15.5
rev: v0.15.16
hooks:
# Run the linter.
- id: ruff-check
Expand All @@ -27,6 +27,7 @@ repos:
language: node
pass_filenames: true
additional_dependencies: ["pyright"]
exclude: '^benchmarks/'
- repo: local
hooks:
- id: pylint
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/edgar_large_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
),
)

BUCKET = "hishel-cache-test"
BUCKET = "large-cache-test"


def clear_cache():
Expand Down
71 changes: 44 additions & 27 deletions httpxthrottlecache/filecache/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@

"""

import hashlib
import json
import logging
import os
import time
from email.utils import formatdate, parsedate_to_datetime
from pathlib import Path
from typing import Callable, Iterator, Optional, Tuple, Union
from typing import Callable, Iterator, Optional, Tuple, Union, cast
from urllib.parse import quote, unquote

import aiofiles
import httpx
import httpx2
from filelock import AsyncFileLock, FileLock

from ..controller import get_rule_for_request
Expand All @@ -25,7 +26,7 @@ class AlreadyLockedError(Exception):
pass


class DualFileStream(httpx.SyncByteStream, httpx.AsyncByteStream):
class DualFileStream(httpx2.SyncByteStream, httpx2.AsyncByteStream):
def __init__(
self,
path: Path,
Expand Down Expand Up @@ -78,12 +79,16 @@ def _load_meta(self, p: Path) -> dict[str, str]:
return {}

def to_path(self, host: str, path: str, query: str) -> Path:
"""Map a request to a cache file path. Pure: does not touch the filesystem.

Short hash is added in case readable names collide (e.g. /a/b, /a-b, and ?a=b)
"""
site = host.lower().rstrip(".")
(self.cache_dir / site).mkdir(parents=True, exist_ok=True)
name = unquote(path).strip("/").replace("/", "-") or "index"
if query:
name += "-" + unquote(query).replace("&", "-").replace("=", "-")
return self.cache_dir / site / quote(name, safe="._-~")
digest = hashlib.blake2b(f"{path}?{query}".encode(), digest_size=4).hexdigest()
return self.cache_dir / site / (quote(name, safe="._-~") + "-" + digest)

def get_if_fresh(
self, host: str, path: str, query: str, cache_rules: dict[str, dict[str, Union[bool, int]]]
Expand Down Expand Up @@ -116,7 +121,7 @@ def get_if_fresh(


class _TeeCore:
def __init__(self, resp: httpx.Response, path: Path, locking: bool, last_modified: str, access_date: str):
def __init__(self, resp: httpx2.Response, path: Path, locking: bool, last_modified: str, access_date: str):
assert path is not None

self.resp = resp
Expand All @@ -138,6 +143,7 @@ def acquire(self):
self.lock and self.lock.acquire() # pyright: ignore[reportUnusedExpression]

def open_tmp(self):
self.tmp.parent.mkdir(parents=True, exist_ok=True)
self.fh = open(self.tmp, "wb")

def write(self, chunk: bytes):
Expand Down Expand Up @@ -165,8 +171,8 @@ def finalize(self):
self.lock.release()


class _TeeToDisk(httpx.SyncByteStream):
def __init__(self, resp: httpx.Response, path: Path, locking: bool, last_modified: str, access_date: str) -> None:
class _TeeToDisk(httpx2.SyncByteStream):
def __init__(self, resp: httpx2.Response, path: Path, locking: bool, last_modified: str, access_date: str) -> None:
self.core = _TeeCore(resp, path, locking, last_modified, access_date)

def __iter__(self) -> Iterator[bytes]:
Expand All @@ -186,8 +192,8 @@ def close(self) -> None:
self.core.finalize()


class _AsyncTeeToDisk(httpx.AsyncByteStream):
def __init__(self, resp: httpx.Response, path: Path, locking: bool, last_modified: str, access_date: str):
class _AsyncTeeToDisk(httpx2.AsyncByteStream):
def __init__(self, resp: httpx2.Response, path: Path, locking: bool, last_modified: str, access_date: str):
self.resp = resp
self.path = path
self.tmp = path.with_name(path.name + ".tmp")
Expand All @@ -206,6 +212,7 @@ async def __aiter__(self):
if self.lock:
await self.lock.acquire()
try:
self.tmp.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(self.tmp, "wb") as f:
async for chunk in self.resp.aiter_raw():
await f.write(chunk)
Expand All @@ -218,34 +225,35 @@ async def __aiter__(self):
}
await m.write(json.dumps({"fetched": self.atime, "origin_lm": self.mtime, "headers": headers}))
finally:
if self.lock:
if self.lock and getattr(self.lock, "is_locked", False):
await self.lock.release()

async def aclose(self):
try:
await self.resp.aclose()
finally:
if self.lock:
if self.lock and getattr(self.lock, "is_locked", False):
await self.lock.release()


class CachingTransport(httpx.BaseTransport, httpx.AsyncBaseTransport):
class CachingTransport(httpx2.BaseTransport, httpx2.AsyncBaseTransport):
cache_rules: dict[str, dict[str, Union[bool, int]]]
streaming_cutoff: int = 8 * 1024 * 1024

transport: httpx.HTTPTransport
_cache: FileCache
transport: httpx2.HTTPTransport
_cache: FileCache

def __init__(
self,
cache_dir: Union[str, Path],
cache_rules: dict[str, dict[str, Union[bool, int]]],
transport: Optional[httpx.BaseTransport] = None,
transport: Optional[httpx2.BaseTransport] = None,
):
self._cache = FileCache(cache_dir=cache_dir, locking=True)
self.transport = transport or httpx.HTTPTransport()
self.transport = transport or httpx2.HTTPTransport()
self.cache_rules = cache_rules

def _cache_hit_response(self, req: httpx.Request, path: Path, status_code: int = 200):
def _cache_hit_response(self, req: httpx2.Request, path: Path, status_code: int = 200):
"""
TODO: More carefully consider async here. read_text, read_bytes both are blocking.

Expand All @@ -272,22 +280,28 @@ def _cache_hit_response(self, req: httpx.Request, path: Path, status_code: int =

if size < self.streaming_cutoff:
# If the file is small, just read it and return it
return httpx.Response(
return httpx2.Response(
status_code=status_code,
headers=headers,
content=path.read_bytes(),
request=req,
)
else:
# If the file is large, stream it
return httpx.Response(
return httpx2.Response(
status_code=status_code,
headers=headers,
stream=DualFileStream(path),
request=req,
)

def _cache_miss_response(self, req: httpx.Request, net: httpx.Response, path: Path, tee_factory):
def _cache_miss_response(
self,
req: httpx2.Request,
net: httpx2.Response,
path: Path,
tee_factory: Callable[..., Union[httpx2.SyncByteStream, httpx2.AsyncByteStream]],
):
if net.status_code != 200:
return net

Expand All @@ -297,7 +311,7 @@ def _cache_miss_response(self, req: httpx.Request, net: httpx.Response, path: Pa
if k.lower() not in ("transfer-encoding",) # "content-encoding", "content-length", "transfer-encoding")
]
miss_headers.append(("x-cache", "MISS"))
return httpx.Response(
return httpx2.Response(
status_code=net.status_code,
headers=miss_headers,
stream=tee_factory(
Expand All @@ -307,7 +321,7 @@ def _cache_miss_response(self, req: httpx.Request, net: httpx.Response, path: Pa
extensions={**net.extensions, "decode_content": False},
)

def return_if_fresh(self, request: httpx.Request) -> Tuple[Optional[httpx.Response], Optional[Path]]:
def return_if_fresh(self, request: httpx2.Request) -> Tuple[Optional[httpx2.Response], Optional[Path]]:
host = request.url.host
path = request.url.path
query = request.url.query.decode() if request.url.query else ""
Expand All @@ -327,7 +341,7 @@ def return_if_fresh(self, request: httpx.Request) -> Tuple[Optional[httpx.Respon
else:
return None, None

def handle_request(self, request: httpx.Request) -> httpx.Response:
def handle_request(self, request: httpx2.Request) -> httpx2.Response:
if request.method != "GET":
return self.transport.handle_request(request)

Expand All @@ -348,15 +362,18 @@ def handle_request(self, request: httpx.Request) -> httpx.Response:
path = self._cache.to_path(host, path, query)
return self._cache_miss_response(request, net, path, _TeeToDisk)

async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
async def handle_async_request(self, request: httpx2.Request) -> httpx2.Response:
# self.transport holds an async transport in the async path; the attribute is
# typed sync (httpx2.HTTPTransport) for the common case, so cast for async use.
transport = cast(httpx2.AsyncBaseTransport, self.transport)
if request.method != "GET":
return await self.transport.handle_async_request(request) # type: ignore[attr-defined]
return await transport.handle_async_request(request)

response, path = self.return_if_fresh(request)
if response:
return response

net: httpx.Response = await self.transport.handle_async_request(request)
net: httpx2.Response = await transport.handle_async_request(request)
if net.status_code == 304:
assert path is not None # must be true
logger.info("304 for %s", request)
Expand Down
Loading
Loading