From 57ed1df5cf4a9edaf5060a396c6a833d860af7c9 Mon Sep 17 00:00:00 2001 From: kraysent Date: Mon, 1 Jun 2026 19:29:31 +0100 Subject: [PATCH] show current progress more verbosely --- frontend/src/components/ProgressView.tsx | 48 +++++++++++++++++-- tests/test_server_integration.py | 4 +- tests/test_upload.py | 2 +- uploader/app/crossmatch/engine.py | 5 +- uploader/app/crossmatch/submit.py | 5 +- uploader/app/interface.py | 6 +-- uploader/app/report.py | 3 +- uploader/app/sources/csv.py | 5 +- uploader/app/sources/fits.py | 5 +- uploader/app/sources/vizier.py | 4 +- .../app/structured/designations/upload.py | 11 +++-- uploader/app/structured/geometry/upload.py | 5 +- uploader/app/structured/icrs/upload.py | 5 +- uploader/app/structured/nature/upload.py | 5 +- uploader/app/structured/note.py | 2 +- uploader/app/structured/photometry/upload.py | 5 +- uploader/app/structured/redshift/upload.py | 5 +- uploader/app/upload.py | 11 ++--- uploader/forms/authenticate.py | 2 +- uploader/tasks.py | 7 +-- 20 files changed, 87 insertions(+), 58 deletions(-) diff --git a/frontend/src/components/ProgressView.tsx b/frontend/src/components/ProgressView.tsx index f78072e..aaaa289 100644 --- a/frontend/src/components/ProgressView.tsx +++ b/frontend/src/components/ProgressView.tsx @@ -9,7 +9,7 @@ import Typography from "@mui/material/Typography"; import { cancelRun } from "../api"; type StreamEvent = - | { type: "progress"; percent: number } + | { type: "progress"; current: number; total: number } | { type: "log"; message: string } | { type: "image"; @@ -35,6 +35,19 @@ function formatStreamTime(iso: string): string { }); } +function progressPercent(current: number, total: number): number { + if (total <= 0) return 0; + return Math.min(100, Math.max(0, Math.ceil((100 * current) / total))); +} + +function formatRate(rate: number): string { + if (rate >= 1_000_000) return `${(rate / 1_000_000).toFixed(1)}M/s`; + if (rate >= 1_000) return `${(rate / 1_000).toFixed(1)}k/s`; + if (rate >= 10) return `${Math.round(rate)}/s`; + if (rate >= 1) return `${rate.toFixed(1)}/s`; + return `${rate.toFixed(2)}/s`; +} + function isNearBottom(el: HTMLElement, threshold = 48) { return el.scrollHeight - el.scrollTop - el.clientHeight <= threshold; } @@ -46,7 +59,9 @@ export function ProgressView({ runId: string; onReset: () => void; }) { - const [percent, setPercent] = useState(0); + const [current, setCurrent] = useState(0); + const [total, setTotal] = useState(0); + const [rate, setRate] = useState(null); const [logs, setLogs] = useState([]); const [images, setImages] = useState([]); const [imageIndex, setImageIndex] = useState(0); @@ -58,6 +73,9 @@ export function ProgressView({ const logContainerRef = useRef(null); const stickToBottomRef = useRef(true); const stickToLatestImageRef = useRef(true); + const lastProgressRef = useRef<{ current: number; at: number } | null>(null); + + const percent = progressPercent(current, total); function handleLogScroll() { const el = logContainerRef.current; @@ -72,15 +90,28 @@ export function ProgressView({ } useEffect(() => { + setCurrent(0); + setTotal(0); + setRate(null); + lastProgressRef.current = null; stickToBottomRef.current = true; stickToLatestImageRef.current = true; const es = new EventSource(`/api/runs/${runId}/stream`); es.onmessage = (e) => { try { const ev = JSON.parse(e.data) as StreamEvent; - if (ev.type === "progress") - setPercent(Math.min(100, Math.max(0, Math.ceil(ev.percent)))); - else if (ev.type === "log") setLogs((x) => [...x, ev.message]); + if (ev.type === "progress") { + const now = Date.now(); + const prev = lastProgressRef.current; + if (prev !== null) { + const dt = (now - prev.at) / 1000; + const delta = ev.current - prev.current; + if (dt > 0 && delta > 0) setRate(delta / dt); + } + lastProgressRef.current = { current: ev.current, at: now }; + setCurrent(ev.current); + setTotal(ev.total); + } else if (ev.type === "log") setLogs((x) => [...x, ev.message]); else if (ev.type === "image") { setImages((prev) => { const next = [ @@ -135,6 +166,13 @@ export function ProgressView({ /> {percent}% + {total > 0 && ( + <> + {" "} + · {current.toLocaleString()} / {total.toLocaleString()} + + )} + {rate !== null && <> · {formatRate(rate)}} {error && ( diff --git a/tests/test_server_integration.py b/tests/test_server_integration.py index f778ad6..e7aa578 100644 --- a/tests/test_server_integration.py +++ b/tests/test_server_integration.py @@ -35,7 +35,7 @@ def isolated_task_state(monkeypatch: pytest.MonkeyPatch, tmp_path: Any) -> Itera def test_task_integration_flow(isolated_task_state: None) -> None: def fake_handler(form: FakeTaskForm, emit: Callable[[report.Event], None]) -> None: emit(report.LogEvent(message=f"Start for {form.name}")) - emit(report.ProgressEvent(percent=50)) + emit(report.ProgressEvent(current=1, total=2)) emit(report.DoneEvent(message=f"Completed {form.name}")) fake_task = tasks.TaskDefinition( @@ -102,7 +102,7 @@ def test_task_cancel_flow(isolated_task_state: None) -> None: def cancellable_handler(form: FakeTaskForm, emit: Callable[[report.Event], None]) -> None: for idx in range(20): emit(report.LogEvent(message=f"Step {idx} for {form.name}")) - emit(report.ProgressEvent(percent=(idx + 1) * 5)) + emit(report.ProgressEvent(current=idx + 1, total=20)) time.sleep(0.05) emit(report.DoneEvent(message=f"Completed {form.name}")) diff --git a/tests/test_upload.py b/tests/test_upload.py index f948212..6ca9b31 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -24,7 +24,7 @@ def prepare(self) -> None: def get_schema(self) -> list[models.ColumnDescription]: return [] - def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: + def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]: if self.should_raise: raise Exception("Test error") return None diff --git a/uploader/app/crossmatch/engine.py b/uploader/app/crossmatch/engine.py index 7c2e450..1b29504 100644 --- a/uploader/app/crossmatch/engine.py +++ b/uploader/app/crossmatch/engine.py @@ -451,8 +451,7 @@ def run_crossmatch( message=(f"Batch processed: {batch_processed} objects; manual check: {batch_pending} objects.") ) ) - progress = 100.0 if total_records == 0 else (100.0 * total / total_records) - report_func(report.ProgressEvent(percent=min(progress, 100.0))) + report_func(report.ProgressEvent(current=total, total=total_records)) _emit_status_distribution_image(report_func, counts, caption=f"{total} records crossmatched") finally: @@ -479,6 +478,6 @@ def pct(n: int) -> float: title=f"Total records: {total}\n", ) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total_records, total=total_records)) _emit_status_distribution_image(report_func, counts, caption=f"Final: {total} records") report_func(report.DoneEvent(message=summary)) diff --git a/uploader/app/crossmatch/submit.py b/uploader/app/crossmatch/submit.py index e04a33c..844f35d 100644 --- a/uploader/app/crossmatch/submit.py +++ b/uploader/app/crossmatch/submit.py @@ -81,10 +81,9 @@ def run_submit_crossmatch( message=f"Batch submitted: {len(record_ids)} records ({submitted}/{eligible_total}).", ) ) - progress = 100.0 if eligible_total == 0 else (100.0 * submitted / eligible_total) - report_func(report.ProgressEvent(percent=min(progress, 100.0))) + report_func(report.ProgressEvent(current=submitted, total=eligible_total)) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=eligible_total, total=eligible_total)) report_func( report.DoneEvent( message=f"Submitted {submitted}/{eligible_total} records (write={write}).", diff --git a/uploader/app/interface.py b/uploader/app/interface.py index a74816c..4a35632 100644 --- a/uploader/app/interface.py +++ b/uploader/app/interface.py @@ -23,14 +23,12 @@ def get_schema(self) -> list[models.ColumnDescription]: """ @abc.abstractmethod - def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: + def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]: """ Yields DataFrames that represent the data from the table. Not all of the columns from the `get_schema` method must be present but there should be no columns that were not returned from `get_schema`. - This method will yield tuples of (DataFrame, completion_rate) until all data is processed. - - The float returned is the completion rate in the range [0, 1]. It will be displayed to the user. + This method will yield tuples of (DataFrame, current, total) until all data is processed. """ @abc.abstractmethod diff --git a/uploader/app/report.py b/uploader/app/report.py index 8e8be4b..ff4be3d 100644 --- a/uploader/app/report.py +++ b/uploader/app/report.py @@ -16,7 +16,8 @@ class LogEvent: @dataclass(frozen=True) class ProgressEvent: - percent: float + current: int + total: int @dataclass(frozen=True) diff --git a/uploader/app/sources/csv.py b/uploader/app/sources/csv.py index c05c507..452a913 100644 --- a/uploader/app/sources/csv.py +++ b/uploader/app/sources/csv.py @@ -57,14 +57,13 @@ def get_schema(self) -> list[models.ColumnDescription]: ) return columns - def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: + def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]: if self._reader is None: raise RuntimeError("Plugin not prepared. Call prepare() first.") for chunk in self._reader: self._current_chunk += 1 - progress = self._current_chunk / self._total_chunks - yield chunk, progress + yield chunk, self._current_chunk, self._total_chunks def stop(self) -> None: pass diff --git a/uploader/app/sources/fits.py b/uploader/app/sources/fits.py index 4bf6514..2f64caf 100644 --- a/uploader/app/sources/fits.py +++ b/uploader/app/sources/fits.py @@ -58,7 +58,7 @@ def get_schema(self) -> list[models.ColumnDescription]: ) return columns - def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: + def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]: if self._table is None: raise RuntimeError("Plugin not prepared. Call prepare() first.") @@ -70,9 +70,8 @@ def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: df = pandas.DataFrame(batch_data) self._current_batch += 1 - progress = self._current_batch / self._total_batches - yield df, progress + yield df, self._current_batch, self._total_batches def stop(self) -> None: if self._hdu is not None: diff --git a/uploader/app/sources/vizier.py b/uploader/app/sources/vizier.py index a94879a..39485f8 100644 --- a/uploader/app/sources/vizier.py +++ b/uploader/app/sources/vizier.py @@ -132,7 +132,7 @@ def get_schema(self) -> list[models.ColumnDescription]: for field in table.fields ] - def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: + def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]: if not self._obtain_cache_path("tables", self.catalog_name, self.table_name, ext="csv").exists(): app.logger.debug("did not hit cache for the table, downloading") self._write_table_cache(self.catalog_name, self.table_name) @@ -153,7 +153,7 @@ def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]: row_dict = {_sanitize_column_name(k): v for k, v in dict(row).items() if v != "--"} rows.append(_coerce_row_to_schema(row_dict, schema)) - yield pandas.DataFrame(rows), offset / total_rows + yield pandas.DataFrame(rows), offset, total_rows def get_total_rows(self) -> int: if not self._obtain_cache_path("tables", self.catalog_name, self.table_name, ext="csv").exists(): diff --git a/uploader/app/structured/designations/upload.py b/uploader/app/structured/designations/upload.py index 34a55a4..256807b 100644 --- a/uploader/app/structured/designations/upload.py +++ b/uploader/app/structured/designations/upload.py @@ -59,13 +59,14 @@ def _report_batch_progress( report_func: Callable[[report.Event], None], *, rows_read: int, + processed_rows: int, + total_count: int, total_so_far: int, matched: int, unmatched: int, - progress_pct: int, rule_counts: dict[str, int], ) -> None: - report_func(report.ProgressEvent(percent=min(99, progress_pct))) + report_func(report.ProgressEvent(current=processed_rows, total=total_count)) report_func( report.LogEvent( message=( @@ -97,7 +98,7 @@ def pct(n: int) -> float: ] table_rows.append(("(no rule matched)", unmatched, pct(unmatched))) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total, total=total)) _emit_rule_distribution_image( report_func, @@ -188,14 +189,14 @@ def total_pct(n: int, t: int = total_so_far) -> float: unmatched=unmatched, unmatched_pct=round(total_pct(unmatched), 1), ) - progress_pct = int(100 * processed_rows / total_count) if total_count else 0 _report_batch_progress( report_func, rows_read=len(rows), + processed_rows=processed_rows, + total_count=total_count, total_so_far=total_so_far, matched=sum(rule_counts.values()), unmatched=unmatched, - progress_pct=progress_pct, rule_counts=rule_counts, ) diff --git a/uploader/app/structured/geometry/upload.py b/uploader/app/structured/geometry/upload.py index f7e39a9..127d091 100644 --- a/uploader/app/structured/geometry/upload.py +++ b/uploader/app/structured/geometry/upload.py @@ -269,8 +269,7 @@ def upload_geometry_isophotal( ) processed_rows += len(rows) - row_pct = int(100 * processed_rows / total_count) if total_count else 0 - report_func(report.ProgressEvent(percent=min(99, row_pct))) + report_func(report.ProgressEvent(current=processed_rows, total=total_count)) report_func( report.LogEvent( message=f"batch: rows_read={len(rows)} uploaded={uploaded} skipped={skipped}", @@ -306,7 +305,7 @@ def row_pct_label(n: int) -> float: ("a mean (arcsec)", round(a_mean, 3), "-"), ], ) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total_count, total=total_count)) if uploaded > 0: axis_dist.emit_image( report_func, diff --git a/uploader/app/structured/icrs/upload.py b/uploader/app/structured/icrs/upload.py index db98acc..ece478f 100644 --- a/uploader/app/structured/icrs/upload.py +++ b/uploader/app/structured/icrs/upload.py @@ -207,8 +207,7 @@ def upload_icrs( ) processed_rows += len(rows) - row_pct = int(100 * processed_rows / total_count) if total_count else 0 - report_func(report.ProgressEvent(percent=min(99, row_pct))) + report_func(report.ProgressEvent(current=processed_rows, total=total_count)) report_func( report.LogEvent( message=f"batch: rows_read={len(rows)} uploaded={uploaded} skipped={skipped}", @@ -238,7 +237,7 @@ def row_pct_label(n: int) -> float: ("Dec mean", round(dec_mean, 6), "-"), ] ) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total_count, total=total_count)) sky.emit_image(report_func, caption=f"Final: {uploaded} objects") summary = format_table( ("Status", "Count", "%"), diff --git a/uploader/app/structured/nature/upload.py b/uploader/app/structured/nature/upload.py index 1771bf9..9f834ee 100644 --- a/uploader/app/structured/nature/upload.py +++ b/uploader/app/structured/nature/upload.py @@ -108,8 +108,7 @@ def upload_nature( ) processed_rows += len(rows) - batch_pct = int(100 * processed_rows / total_count) if total_count else 0 - report_func(report.ProgressEvent(percent=min(99, batch_pct))) + report_func(report.ProgressEvent(current=processed_rows, total=total_count)) report_func( report.LogEvent( message=f"batch: rows_read={len(rows)} total_uploaded_so_far={total_uploaded}", @@ -129,7 +128,7 @@ def upload_nature( ) for leda_type, count in sorted(type_counts.items()) ] - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total_count, total=total_count)) _emit_type_distribution_image( report_func, type_counts, diff --git a/uploader/app/structured/note.py b/uploader/app/structured/note.py index 4a322b5..5abf064 100644 --- a/uploader/app/structured/note.py +++ b/uploader/app/structured/note.py @@ -31,5 +31,5 @@ def upload_note( ), ) ) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=1, total=1)) report_func(report.DoneEvent(message="Note uploaded successfully.")) diff --git a/uploader/app/structured/photometry/upload.py b/uploader/app/structured/photometry/upload.py index 96d0f94..517a3cf 100644 --- a/uploader/app/structured/photometry/upload.py +++ b/uploader/app/structured/photometry/upload.py @@ -99,8 +99,7 @@ def upload_photometry_hyperleda( objects=uploaded_objects, photometry_rows=uploaded_rows, ) - progress = 100.0 if total_rows == 0 else (100.0 * total_source_rows / total_rows) - report_func(report.ProgressEvent(percent=min(progress, 100.0))) + report_func(report.ProgressEvent(current=total_source_rows, total=total_rows)) finally: total = uploaded_objects + skipped total_photometry_rows = sum(band_counts.values()) @@ -127,5 +126,5 @@ def pct(n: int, denom: int) -> float: percent_last_column=False, ) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total_rows, total=total_rows)) report_func(report.DoneEvent(message=summary)) diff --git a/uploader/app/structured/redshift/upload.py b/uploader/app/structured/redshift/upload.py index 4d2e4e4..92202e2 100644 --- a/uploader/app/structured/redshift/upload.py +++ b/uploader/app/structured/redshift/upload.py @@ -139,8 +139,7 @@ def upload_redshift( ) processed_rows += len(rows) - batch_pct = int(100 * processed_rows / total_count) if total_count else 0 - report_func(report.ProgressEvent(percent=min(99, batch_pct))) + report_func(report.ProgressEvent(current=processed_rows, total=total_count)) report_func( report.LogEvent( message=f"batch: rows_read={len(rows)} uploaded={uploaded} skipped={skipped}", @@ -173,7 +172,7 @@ def row_pct_label(n: int) -> float: ("cz mean (km/s)", round(cz_mean, 2), "-"), ] ) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=total_count, total=total_count)) if uploaded > 0: cz_dist.emit_image( report_func, diff --git a/uploader/app/upload.py b/uploader/app/upload.py index cd844e0..11bc4ce 100644 --- a/uploader/app/upload.py +++ b/uploader/app/upload.py @@ -140,7 +140,7 @@ def _upload( estimated_total_size_bytes = estimated_total_rows * estimated_row_size_bytes skip_unique_stats = estimated_total_size_bytes >= MAX_UNIQUE_STATS_BYTES unique_values_by_column: dict[str, set[Any]] = {column.name: set() for column in schema} - prev_percent = 0 + prev_current = 0 def process_chunk(data: pd.DataFrame) -> None: nonlocal total_rows @@ -187,12 +187,11 @@ def process_chunk(data: pd.DataFrame) -> None: report_func(report.LogEvent(message=size_estimate_msg)) - for data, progress in data_iter: + for data, current, total in data_iter: process_chunk(data) - percent = int(progress * 100) - if percent != prev_percent: - report_func(report.ProgressEvent(percent=percent)) - prev_percent = percent + if current != prev_current: + report_func(report.ProgressEvent(current=current, total=total)) + prev_current = current report_func(report.LogEvent(message=f"\nTotal rows: {total_rows}")) diff --git a/uploader/forms/authenticate.py b/uploader/forms/authenticate.py index d7e9f95..d65e2dd 100644 --- a/uploader/forms/authenticate.py +++ b/uploader/forms/authenticate.py @@ -47,5 +47,5 @@ def handle_authenticate(form: BaseModel, report_func: Callable[[report.Event], N save_token(response.data.token) report_func(report.LogEvent(message="Backend login successful.")) - report_func(report.ProgressEvent(percent=100)) + report_func(report.ProgressEvent(current=1, total=1)) report_func(report.DoneEvent(message="Credentials saved.")) diff --git a/uploader/tasks.py b/uploader/tasks.py index ecba270..cec2895 100644 --- a/uploader/tasks.py +++ b/uploader/tasks.py @@ -86,13 +86,14 @@ def append_report_event(event: report.Event) -> None: message=out, ) run.append({"type": "log", "message": out}) - case report.ProgressEvent(percent=pct): + case report.ProgressEvent(current=current, total=total): logger.info( "progress event", task_id=task_id, - percent=pct, + current=current, + total=total, ) - run.append({"type": "progress", "percent": pct}) + run.append({"type": "progress", "current": current, "total": total}) case report.DoneEvent(message=msg): logger.info( "finish event",