Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions frontend/src/components/ProgressView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Typography from "@mui/material/Typography";
import { cancelRun } from "../api";

type StreamEvent =
| { type: "progress"; percent: number }
| { type: "progress"; current: number; total: number }
| { type: "log"; message: string }
| {
type: "image";
Expand All @@ -35,6 +35,19 @@ function formatStreamTime(iso: string): string {
});
}

function progressPercent(current: number, total: number): number {
if (total <= 0) return 0;
return Math.min(100, Math.max(0, Math.ceil((100 * current) / total)));
}

function formatRate(rate: number): string {
if (rate >= 1_000_000) return `${(rate / 1_000_000).toFixed(1)}M/s`;
if (rate >= 1_000) return `${(rate / 1_000).toFixed(1)}k/s`;
if (rate >= 10) return `${Math.round(rate)}/s`;
if (rate >= 1) return `${rate.toFixed(1)}/s`;
return `${rate.toFixed(2)}/s`;
}

function isNearBottom(el: HTMLElement, threshold = 48) {
return el.scrollHeight - el.scrollTop - el.clientHeight <= threshold;
}
Expand All @@ -46,7 +59,9 @@ export function ProgressView({
runId: string;
onReset: () => void;
}) {
const [percent, setPercent] = useState(0);
const [current, setCurrent] = useState(0);
const [total, setTotal] = useState(0);
const [rate, setRate] = useState<number | null>(null);
const [logs, setLogs] = useState<string[]>([]);
const [images, setImages] = useState<StreamImage[]>([]);
const [imageIndex, setImageIndex] = useState(0);
Expand All @@ -58,6 +73,9 @@ export function ProgressView({
const logContainerRef = useRef<HTMLDivElement>(null);
const stickToBottomRef = useRef(true);
const stickToLatestImageRef = useRef(true);
const lastProgressRef = useRef<{ current: number; at: number } | null>(null);

const percent = progressPercent(current, total);

function handleLogScroll() {
const el = logContainerRef.current;
Expand All @@ -72,15 +90,28 @@ export function ProgressView({
}

useEffect(() => {
setCurrent(0);
setTotal(0);
setRate(null);
lastProgressRef.current = null;
stickToBottomRef.current = true;
stickToLatestImageRef.current = true;
const es = new EventSource(`/api/runs/${runId}/stream`);
es.onmessage = (e) => {
try {
const ev = JSON.parse(e.data) as StreamEvent;
if (ev.type === "progress")
setPercent(Math.min(100, Math.max(0, Math.ceil(ev.percent))));
else if (ev.type === "log") setLogs((x) => [...x, ev.message]);
if (ev.type === "progress") {
const now = Date.now();
const prev = lastProgressRef.current;
if (prev !== null) {
const dt = (now - prev.at) / 1000;
const delta = ev.current - prev.current;
if (dt > 0 && delta > 0) setRate(delta / dt);
}
lastProgressRef.current = { current: ev.current, at: now };
setCurrent(ev.current);
setTotal(ev.total);
} else if (ev.type === "log") setLogs((x) => [...x, ev.message]);
else if (ev.type === "image") {
setImages((prev) => {
const next = [
Expand Down Expand Up @@ -135,6 +166,13 @@ export function ProgressView({
/>
<Typography variant="body2" color="text.secondary" sx={{ mb: 2 }}>
{percent}%
{total > 0 && (
<>
{" "}
· {current.toLocaleString()} / {total.toLocaleString()}
</>
)}
{rate !== null && <> · {formatRate(rate)}</>}
</Typography>
{error && (
<Alert severity="error" sx={{ mb: 2 }}>
Expand Down
4 changes: 2 additions & 2 deletions tests/test_server_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def isolated_task_state(monkeypatch: pytest.MonkeyPatch, tmp_path: Any) -> Itera
def test_task_integration_flow(isolated_task_state: None) -> None:
def fake_handler(form: FakeTaskForm, emit: Callable[[report.Event], None]) -> None:
emit(report.LogEvent(message=f"Start for {form.name}"))
emit(report.ProgressEvent(percent=50))
emit(report.ProgressEvent(current=1, total=2))
emit(report.DoneEvent(message=f"Completed {form.name}"))

fake_task = tasks.TaskDefinition(
Expand Down Expand Up @@ -102,7 +102,7 @@ def test_task_cancel_flow(isolated_task_state: None) -> None:
def cancellable_handler(form: FakeTaskForm, emit: Callable[[report.Event], None]) -> None:
for idx in range(20):
emit(report.LogEvent(message=f"Step {idx} for {form.name}"))
emit(report.ProgressEvent(percent=(idx + 1) * 5))
emit(report.ProgressEvent(current=idx + 1, total=20))
time.sleep(0.05)
emit(report.DoneEvent(message=f"Completed {form.name}"))

Expand Down
2 changes: 1 addition & 1 deletion tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def prepare(self) -> None:
def get_schema(self) -> list[models.ColumnDescription]:
return []

def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:
def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]:
if self.should_raise:
raise Exception("Test error")
return None
Expand Down
5 changes: 2 additions & 3 deletions uploader/app/crossmatch/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ def run_crossmatch(
message=(f"Batch processed: {batch_processed} objects; manual check: {batch_pending} objects.")
)
)
progress = 100.0 if total_records == 0 else (100.0 * total / total_records)
report_func(report.ProgressEvent(percent=min(progress, 100.0)))
report_func(report.ProgressEvent(current=total, total=total_records))
_emit_status_distribution_image(report_func, counts, caption=f"{total} records crossmatched")
finally:

Expand All @@ -479,6 +478,6 @@ def pct(n: int) -> float:
title=f"Total records: {total}\n",
)

report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total_records, total=total_records))
_emit_status_distribution_image(report_func, counts, caption=f"Final: {total} records")
report_func(report.DoneEvent(message=summary))
5 changes: 2 additions & 3 deletions uploader/app/crossmatch/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ def run_submit_crossmatch(
message=f"Batch submitted: {len(record_ids)} records ({submitted}/{eligible_total}).",
)
)
progress = 100.0 if eligible_total == 0 else (100.0 * submitted / eligible_total)
report_func(report.ProgressEvent(percent=min(progress, 100.0)))
report_func(report.ProgressEvent(current=submitted, total=eligible_total))

report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=eligible_total, total=eligible_total))
report_func(
report.DoneEvent(
message=f"Submitted {submitted}/{eligible_total} records (write={write}).",
Expand Down
6 changes: 2 additions & 4 deletions uploader/app/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ def get_schema(self) -> list[models.ColumnDescription]:
"""

@abc.abstractmethod
def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:
def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]:
"""
Yields DataFrames that represent the data from the table.
Not all of the columns from the `get_schema` method must be present but there should be no columns
that were not returned from `get_schema`.
This method will yield tuples of (DataFrame, completion_rate) until all data is processed.

The float returned is the completion rate in the range [0, 1]. It will be displayed to the user.
This method will yield tuples of (DataFrame, current, total) until all data is processed.
"""

@abc.abstractmethod
Expand Down
3 changes: 2 additions & 1 deletion uploader/app/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class LogEvent:

@dataclass(frozen=True)
class ProgressEvent:
percent: float
current: int
total: int


@dataclass(frozen=True)
Expand Down
5 changes: 2 additions & 3 deletions uploader/app/sources/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,13 @@ def get_schema(self) -> list[models.ColumnDescription]:
)
return columns

def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:
def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]:
if self._reader is None:
raise RuntimeError("Plugin not prepared. Call prepare() first.")

for chunk in self._reader:
self._current_chunk += 1
progress = self._current_chunk / self._total_chunks
yield chunk, progress
yield chunk, self._current_chunk, self._total_chunks

def stop(self) -> None:
pass
Expand Down
5 changes: 2 additions & 3 deletions uploader/app/sources/fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_schema(self) -> list[models.ColumnDescription]:
)
return columns

def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:
def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]:
if self._table is None:
raise RuntimeError("Plugin not prepared. Call prepare() first.")

Expand All @@ -70,9 +70,8 @@ def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:

df = pandas.DataFrame(batch_data)
self._current_batch += 1
progress = self._current_batch / self._total_batches

yield df, progress
yield df, self._current_batch, self._total_batches

def stop(self) -> None:
if self._hdu is not None:
Expand Down
4 changes: 2 additions & 2 deletions uploader/app/sources/vizier.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def get_schema(self) -> list[models.ColumnDescription]:
for field in table.fields
]

def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:
def get_data(self) -> Generator[tuple[pandas.DataFrame, int, int]]:
if not self._obtain_cache_path("tables", self.catalog_name, self.table_name, ext="csv").exists():
app.logger.debug("did not hit cache for the table, downloading")
self._write_table_cache(self.catalog_name, self.table_name)
Expand All @@ -153,7 +153,7 @@ def get_data(self) -> Generator[tuple[pandas.DataFrame, float]]:
row_dict = {_sanitize_column_name(k): v for k, v in dict(row).items() if v != "--"}
rows.append(_coerce_row_to_schema(row_dict, schema))

yield pandas.DataFrame(rows), offset / total_rows
yield pandas.DataFrame(rows), offset, total_rows

def get_total_rows(self) -> int:
if not self._obtain_cache_path("tables", self.catalog_name, self.table_name, ext="csv").exists():
Expand Down
11 changes: 6 additions & 5 deletions uploader/app/structured/designations/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ def _report_batch_progress(
report_func: Callable[[report.Event], None],
*,
rows_read: int,
processed_rows: int,
total_count: int,
total_so_far: int,
matched: int,
unmatched: int,
progress_pct: int,
rule_counts: dict[str, int],
) -> None:
report_func(report.ProgressEvent(percent=min(99, progress_pct)))
report_func(report.ProgressEvent(current=processed_rows, total=total_count))
report_func(
report.LogEvent(
message=(
Expand Down Expand Up @@ -97,7 +98,7 @@ def pct(n: int) -> float:
]
table_rows.append(("(no rule matched)", unmatched, pct(unmatched)))

report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total, total=total))

_emit_rule_distribution_image(
report_func,
Expand Down Expand Up @@ -188,14 +189,14 @@ def total_pct(n: int, t: int = total_so_far) -> float:
unmatched=unmatched,
unmatched_pct=round(total_pct(unmatched), 1),
)
progress_pct = int(100 * processed_rows / total_count) if total_count else 0
_report_batch_progress(
report_func,
rows_read=len(rows),
processed_rows=processed_rows,
total_count=total_count,
total_so_far=total_so_far,
matched=sum(rule_counts.values()),
unmatched=unmatched,
progress_pct=progress_pct,
rule_counts=rule_counts,
)

Expand Down
5 changes: 2 additions & 3 deletions uploader/app/structured/geometry/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ def upload_geometry_isophotal(
)

processed_rows += len(rows)
row_pct = int(100 * processed_rows / total_count) if total_count else 0
report_func(report.ProgressEvent(percent=min(99, row_pct)))
report_func(report.ProgressEvent(current=processed_rows, total=total_count))
report_func(
report.LogEvent(
message=f"batch: rows_read={len(rows)} uploaded={uploaded} skipped={skipped}",
Expand Down Expand Up @@ -306,7 +305,7 @@ def row_pct_label(n: int) -> float:
("a mean (arcsec)", round(a_mean, 3), "-"),
],
)
report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total_count, total=total_count))
if uploaded > 0:
axis_dist.emit_image(
report_func,
Expand Down
5 changes: 2 additions & 3 deletions uploader/app/structured/icrs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ def upload_icrs(
)

processed_rows += len(rows)
row_pct = int(100 * processed_rows / total_count) if total_count else 0
report_func(report.ProgressEvent(percent=min(99, row_pct)))
report_func(report.ProgressEvent(current=processed_rows, total=total_count))
report_func(
report.LogEvent(
message=f"batch: rows_read={len(rows)} uploaded={uploaded} skipped={skipped}",
Expand Down Expand Up @@ -238,7 +237,7 @@ def row_pct_label(n: int) -> float:
("Dec mean", round(dec_mean, 6), "-"),
]
)
report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total_count, total=total_count))
sky.emit_image(report_func, caption=f"Final: {uploaded} objects")
summary = format_table(
("Status", "Count", "%"),
Expand Down
5 changes: 2 additions & 3 deletions uploader/app/structured/nature/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ def upload_nature(
)

processed_rows += len(rows)
batch_pct = int(100 * processed_rows / total_count) if total_count else 0
report_func(report.ProgressEvent(percent=min(99, batch_pct)))
report_func(report.ProgressEvent(current=processed_rows, total=total_count))
report_func(
report.LogEvent(
message=f"batch: rows_read={len(rows)} total_uploaded_so_far={total_uploaded}",
Expand All @@ -129,7 +128,7 @@ def upload_nature(
)
for leda_type, count in sorted(type_counts.items())
]
report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total_count, total=total_count))
_emit_type_distribution_image(
report_func,
type_counts,
Expand Down
2 changes: 1 addition & 1 deletion uploader/app/structured/note.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def upload_note(
),
)
)
report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=1, total=1))
report_func(report.DoneEvent(message="Note uploaded successfully."))
5 changes: 2 additions & 3 deletions uploader/app/structured/photometry/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ def upload_photometry_hyperleda(
objects=uploaded_objects,
photometry_rows=uploaded_rows,
)
progress = 100.0 if total_rows == 0 else (100.0 * total_source_rows / total_rows)
report_func(report.ProgressEvent(percent=min(progress, 100.0)))
report_func(report.ProgressEvent(current=total_source_rows, total=total_rows))
finally:
total = uploaded_objects + skipped
total_photometry_rows = sum(band_counts.values())
Expand All @@ -127,5 +126,5 @@ def pct(n: int, denom: int) -> float:
percent_last_column=False,
)

report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total_rows, total=total_rows))
report_func(report.DoneEvent(message=summary))
5 changes: 2 additions & 3 deletions uploader/app/structured/redshift/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ def upload_redshift(
)

processed_rows += len(rows)
batch_pct = int(100 * processed_rows / total_count) if total_count else 0
report_func(report.ProgressEvent(percent=min(99, batch_pct)))
report_func(report.ProgressEvent(current=processed_rows, total=total_count))
report_func(
report.LogEvent(
message=f"batch: rows_read={len(rows)} uploaded={uploaded} skipped={skipped}",
Expand Down Expand Up @@ -173,7 +172,7 @@ def row_pct_label(n: int) -> float:
("cz mean (km/s)", round(cz_mean, 2), "-"),
]
)
report_func(report.ProgressEvent(percent=100))
report_func(report.ProgressEvent(current=total_count, total=total_count))
if uploaded > 0:
cz_dist.emit_image(
report_func,
Expand Down
Loading
Loading