Skip to content

Commit 28dac95

Browse files
authored
refactor fixture-updater to python (#14336)
1 parent 3babf4a commit 28dac95

3 files changed

Lines changed: 280 additions & 2 deletions

File tree

.github/workflows/update-sample-data.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ jobs:
2020
with:
2121
ref: ${{ github.ref_name || 'dev'}}
2222

23-
- name: Run binary
23+
- name: Run updater
2424
run: |
25-
./fixture-updater dojo/fixtures/defect_dojo_sample_data.json
25+
scripts/fixture-updater.py dojo/fixtures/defect_dojo_sample_data.json
2626
mv output.json dojo/fixtures/defect_dojo_sample_data.json
2727
2828
- name: Configure git

fixture-updater

-9.83 MB
Binary file not shown.

scripts/fixture-updater.py

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
#!/usr/bin/env python3
2+
3+
import argparse
4+
import json
5+
import logging
6+
import re
7+
from collections.abc import Generator
8+
from dataclasses import dataclass
9+
from datetime import UTC, datetime, timedelta
10+
from pathlib import Path
11+
from time import perf_counter
12+
from typing import Any
13+
14+
DATETIME_FIELD_PATTERN = re.compile(
15+
r"^(?P<date>\d{4}-\d{2}-\d{2})T"
16+
r"(?P<time>\d{2}:\d{2}:\d{2})"
17+
r"(?:\.(?P<fraction>\d{1,6}))?Z$",
18+
)
19+
DATE_FIELD_PATTERN = re.compile(r"^(?P<date>\d{4}-\d{2}-\d{2})$")
20+
logger = logging.getLogger(__name__)
21+
22+
23+
@dataclass(frozen=True)
24+
class ParsedUtcTimestamp:
25+
26+
"""
27+
UTC timestamp with preserved formatting metadata.
28+
29+
>>> parsed = ParsedUtcTimestamp.parse("2024-01-02")
30+
>>> parsed is not None
31+
True
32+
>>> parsed.dt.isoformat()
33+
'2024-01-02T00:00:00+00:00'
34+
>>> parsed.value_type is DATE_FIELD_PATTERN
35+
True
36+
>>> parsed.fraction_len
37+
0
38+
39+
>>> parsed = ParsedUtcTimestamp.parse("2024-01-02T03:04:05.12Z")
40+
>>> parsed is not None
41+
True
42+
>>> parsed.dt.isoformat()
43+
'2024-01-02T03:04:05.120000+00:00'
44+
>>> parsed.value_type is DATETIME_FIELD_PATTERN
45+
True
46+
>>> parsed.fraction_len
47+
2
48+
49+
>>> ParsedUtcTimestamp.parse("2024-01-02T03:04:05.12Zx") is None
50+
True
51+
>>> ParsedUtcTimestamp.parse("2024-01-02T03:04:05Z") is not None
52+
True
53+
54+
>>> dt = datetime.fromisoformat("2024-01-02T03:04:05.123456+00:00")
55+
>>> ParsedUtcTimestamp(dt, DATETIME_FIELD_PATTERN, 2).format()
56+
'2024-01-02T03:04:05.12Z'
57+
>>> ParsedUtcTimestamp(dt, DATE_FIELD_PATTERN, 0).format()
58+
'2024-01-02'
59+
"""
60+
61+
dt: datetime
62+
value_type: re.Pattern[str]
63+
fraction_len: int
64+
65+
@classmethod
66+
def parse(cls, value: str) -> "ParsedUtcTimestamp | None":
67+
match = DATETIME_FIELD_PATTERN.match(value)
68+
if match:
69+
fraction = match.group("fraction") or ""
70+
padded_fraction = (fraction + "000000")[:6]
71+
timestamp = f"{match.group('date')}T{match.group('time')}.{padded_fraction}+00:00"
72+
parsed = datetime.fromisoformat(timestamp)
73+
return cls(parsed, DATETIME_FIELD_PATTERN, len(fraction))
74+
75+
date_only_match = DATE_FIELD_PATTERN.match(value)
76+
if date_only_match:
77+
parsed = datetime.fromisoformat(f"{date_only_match.group('date')}T00:00:00+00:00")
78+
return cls(parsed, DATE_FIELD_PATTERN, 0)
79+
80+
return None
81+
82+
def format(self) -> str:
83+
value = self.dt.astimezone(UTC)
84+
if self.value_type is DATE_FIELD_PATTERN:
85+
return value.date().isoformat()
86+
87+
base = value.strftime("%Y-%m-%dT%H:%M:%S")
88+
if self.fraction_len > 0:
89+
micro = f"{value.microsecond:06d}"[: self.fraction_len]
90+
return f"{base}.{micro}Z"
91+
return f"{base}Z"
92+
93+
def shifted(self, delta: timedelta) -> "ParsedUtcTimestamp":
94+
return ParsedUtcTimestamp(self.dt + delta, self.value_type, self.fraction_len)
95+
96+
97+
def iter_string_nodes(value: Any) -> Generator[tuple[dict[str, Any] | list[Any], str | int, str]]:
98+
"""
99+
Yield mutable container references for every nested string value.
100+
101+
>>> data = {"a": "x", "b": [1, {"c": "y"}]}
102+
>>> list(iter_string_nodes(data))
103+
[({'a': 'x', 'b': [1, {'c': 'y'}]}, 'a', 'x'), ({'c': 'y'}, 'c', 'y')]
104+
"""
105+
if isinstance(value, dict):
106+
for key, item in value.items():
107+
if isinstance(item, str):
108+
yield value, key, item
109+
else:
110+
yield from iter_string_nodes(item)
111+
elif isinstance(value, list):
112+
for idx, item in enumerate(value):
113+
if isinstance(item, str):
114+
yield value, idx, item
115+
else:
116+
yield from iter_string_nodes(item)
117+
118+
119+
def parse_target_latest_time(value: str) -> datetime:
120+
"""
121+
Parse CLI `--latest-time` values.
122+
123+
>>> parse_target_latest_time("2024-01-02").isoformat()
124+
'2024-01-02T00:00:00+00:00'
125+
>>> parse_target_latest_time("oops")
126+
Traceback (most recent call last):
127+
...
128+
argparse.ArgumentTypeError: Invalid --latest-time. Expected YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS(.fraction)Z.
129+
"""
130+
parsed = ParsedUtcTimestamp.parse(value)
131+
if not parsed:
132+
msg = "Invalid --latest-time. Expected YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS(.fraction)Z."
133+
raise argparse.ArgumentTypeError(msg)
134+
return parsed.dt
135+
136+
137+
class FixtureUpdater:
138+
def __init__(self, fixture_path: Path, output_path: Path, target_latest_dt: datetime | None = None) -> None:
139+
self.fixture_path = fixture_path
140+
self.output_path = output_path
141+
self.target_latest_dt = target_latest_dt
142+
self.data: list[dict[str, Any]] = []
143+
self.found_dates: list[tuple[dict[str, Any] | list[Any], str | int, ParsedUtcTimestamp]] = []
144+
self.latest: ParsedUtcTimestamp | None = None
145+
self.delta: timedelta | None = None
146+
self.updated_count = 0
147+
self.elapsed_ms = 0
148+
149+
def load_fixture(self) -> None:
150+
data = json.loads(self.fixture_path.read_text())
151+
if not isinstance(data, list):
152+
msg = "Fixture JSON must be an array at the top level."
153+
raise TypeError(msg)
154+
for idx, item in enumerate(data):
155+
if not isinstance(item, dict):
156+
msg = f"Fixture item at index {idx} is not an object."
157+
raise TypeError(msg)
158+
fields = item.get("fields")
159+
if not isinstance(fields, dict):
160+
msg = f'Fixture item at index {idx} is missing a valid "fields" object.'
161+
raise TypeError(msg)
162+
self.data = data
163+
164+
def collect_dates(self) -> None:
165+
for obj in self.data:
166+
for container, key, item in iter_string_nodes(obj["fields"]):
167+
parsed = ParsedUtcTimestamp.parse(item)
168+
if parsed:
169+
self.found_dates.append((container, key, parsed))
170+
171+
def compute_shift(self) -> None:
172+
"""
173+
Compute the delta between fixture latest timestamp and target timestamp.
174+
175+
>>> updater = FixtureUpdater(Path("in.json"), Path("out.json"), parse_target_latest_time("2024-01-03"))
176+
>>> parsed = ParsedUtcTimestamp.parse("2024-01-01")
177+
>>> parsed is not None
178+
True
179+
>>> updater.found_dates = [({"x": "2024-01-01"}, "x", parsed)]
180+
>>> updater.compute_shift()
181+
>>> updater.delta == timedelta(days=2)
182+
True
183+
"""
184+
_, _, self.latest = max(self.found_dates, key=lambda value: value[2].dt)
185+
target = self.target_latest_dt or datetime.now(UTC)
186+
self.delta = target - self.latest.dt
187+
188+
def apply_shift(self) -> int:
189+
"""
190+
Apply previously computed delta to all collected timestamp fields.
191+
192+
>>> updater = FixtureUpdater(Path("in.json"), Path("out.json"))
193+
>>> parsed = ParsedUtcTimestamp.parse("2024-01-01T00:00:00Z")
194+
>>> parsed is not None
195+
True
196+
>>> container = {"x": "2024-01-01T00:00:00Z"}
197+
>>> updater.found_dates = [(container, "x", parsed)]
198+
>>> updater.delta = timedelta(days=1)
199+
>>> updater.apply_shift()
200+
1
201+
>>> container["x"]
202+
'2024-01-02T00:00:00Z'
203+
"""
204+
if self.delta is None:
205+
msg = "Cannot apply shift before computing delta."
206+
raise RuntimeError(msg)
207+
for container, key, parsed in self.found_dates:
208+
container[key] = parsed.shifted(self.delta).format()
209+
return len(self.found_dates)
210+
211+
def write_output(self) -> None:
212+
self.output_path.write_text(json.dumps(self.data, indent=2))
213+
214+
def run(self) -> None:
215+
started_at = perf_counter()
216+
self.load_fixture()
217+
self.collect_dates()
218+
if not self.found_dates:
219+
self.elapsed_ms = int((perf_counter() - started_at) * 1000)
220+
return
221+
222+
self.compute_shift()
223+
self.updated_count = self.apply_shift()
224+
self.write_output()
225+
self.elapsed_ms = int((perf_counter() - started_at) * 1000)
226+
227+
def report(self) -> None:
228+
if self.latest is None or self.delta is None:
229+
logger.info("No matching UTC date strings found. No changes made.")
230+
logger.info("Completed in %dms!", self.elapsed_ms)
231+
return
232+
233+
logger.info("Dates moved up by %.1f days", self.delta.total_seconds() / 86400)
234+
logger.info("Updated %d date value(s).", self.updated_count)
235+
logger.info(
236+
"Most recent original timestamp: %s",
237+
self.latest.format(),
238+
)
239+
logger.info(
240+
"New most recent timestamp: %s",
241+
self.latest.shifted(self.delta).format(),
242+
)
243+
logger.info("Wrote updated fixture to: %s", self.output_path)
244+
logger.info("Completed in %dms!", self.elapsed_ms)
245+
246+
247+
def main() -> None:
248+
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
249+
250+
parser = argparse.ArgumentParser(
251+
description=(
252+
"Shift date values under each fixture object's 'fields' (supports "
253+
"YYYY-MM-DDTHH:MM:SS(.fraction)Z and YYYY-MM-DD) so the most recent "
254+
"detected value becomes the current UTC datetime."
255+
),
256+
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
257+
)
258+
parser.add_argument("fixture_file", type=Path, help="Path to a Django fixture JSON file")
259+
parser.add_argument(
260+
"-o",
261+
"--output-file",
262+
default="output.json",
263+
type=Path,
264+
help="Path to output JSON file",
265+
)
266+
parser.add_argument(
267+
"--latest-time",
268+
type=parse_target_latest_time,
269+
help="Custom UTC target for the most recent fixture timestamp",
270+
)
271+
args = parser.parse_args()
272+
updater = FixtureUpdater(args.fixture_file, args.output_file, args.latest_time)
273+
updater.run()
274+
updater.report()
275+
276+
277+
if __name__ == "__main__":
278+
main()

0 commit comments

Comments
 (0)