-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch_multi_plot_FAST_spectrograms.py
More file actions
2538 lines (2396 loc) · 113 KB
/
batch_multi_plot_FAST_spectrograms.py
File metadata and controls
2538 lines (2396 loc) · 113 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Plots a folder of FAST ESA data as spectrograms.
Assumed folder layout is::
{FAST_CDF_DATA_FOLDER_PATH}/year/month
Filenames in the month folders assumed to be in the following formats::
{??}_{??}_{??}_{instrument}_{timestamp}_{orbit}_v02.cdf (known "instruments" are ees, eeb, ies, or ieb)
{??}_{??}_orb_{orbit}_{??}.cdf
Examples::
FAST_data/2000/01/fa_esa_l2_eeb_20000101001737_13312_v02.cdf
FAST_data/2000/01/fa_k0_orb_13312_v01.cdf
"""
__authors__: list[str] = ["Ev Hansen"]
__contact__: str = "ephansen+gh@terpmail.umd.edu"
__credits__: list[list[str]] = [
["Ev Hansen", "Python code"],
["Emma Mirizio", "Co-Mentor"],
["Marilia Samara", "Co-Mentor"],
]
__date__: str = "2025-08-13"
__status__: str = "Development"
__version__: str = "0.0.1"
__license__: str = "GPL-3.0"
import signal
import os
import sys
import gc
import concurrent.futures
import json
from collections import defaultdict, deque
import math
from datetime import datetime, timezone
from pathlib import Path
from tqdm import tqdm
import traceback
import numpy as np
import cdflib
import time as _time
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
# Import only required helpers from generic spectrogram module (avoid wildcard import for linting clarity)
from batch_multi_plot_spectrogram import (
load_filtered_orbits,
get_cdf_file_type,
get_timestamps_for_orbit,
generic_plot_multirow_optional_zoom,
close_all_axes_and_clear,
log_error,
log_message,
DEFAULT_ZOOM_WINDOW_MINUTES,
)
# FAST-specific paths (renamed for FAST batch)
FAST_CDF_DATA_FOLDER_PATH = "./FAST_data/"
FAST_FILTERED_ORBITS_CSV_PATH = "./FAST_Cusp_Indices.csv"
FAST_PLOTTING_PROGRESS_JSON = "./batch_multi_plot_FAST_progress.json"
FAST_LOGFILE_PATH = "./batch_multi_plot_FAST_log.log"
FAST_OUTPUT_BASE = "./FAST_plots/"
FAST_LOGFILE_DATETIME_PATH = "./batch_multi_plot_FAST_logfile_datetime.txt"
if os.path.exists(FAST_LOGFILE_DATETIME_PATH):
with open(FAST_LOGFILE_DATETIME_PATH, "r") as f:
FAST_LOGFILE_DATETIME_STRING = f.read().strip()
if not FAST_LOGFILE_DATETIME_STRING:
FAST_LOGFILE_DATETIME_STRING = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(FAST_LOGFILE_DATETIME_PATH, "w") as f:
f.write(FAST_LOGFILE_DATETIME_STRING)
else:
FAST_LOGFILE_DATETIME_STRING = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
with open(FAST_LOGFILE_DATETIME_PATH, "w") as f:
f.write(FAST_LOGFILE_DATETIME_STRING)
FAST_COLLAPSE_FUNCTION = np.nansum
CDF_VARIABLES = ["time_unix", "data", "energy", "pitch_angle"]
# Colormaps for different axis scaling combinations (colorblind-friendly and visually distinct)
DEFAULT_COLORMAP_LINEAR_Y_LINEAR_Z = "viridis"
DEFAULT_COLORMAP_LINEAR_Y_LOG_Z = "cividis"
DEFAULT_COLORMAP_LOG_Y_LINEAR_Z = "plasma"
DEFAULT_COLORMAP_LOG_Y_LOG_Z = "inferno"
# Buffered logging configuration (batching to reduce I/O)
INFO_LOG_BATCH_SIZE_DEFAULT = 10 # fallback default
_INFO_LOG_BATCH_SIZE = INFO_LOG_BATCH_SIZE_DEFAULT
_INFO_LOG_BUFFER: List[Tuple[str, str]] = [] # (level, message)
def configure_info_logger_batch(batch_size: int) -> None:
"""Configure the batch size for buffered info logging.
Parameters
----------
batch_size : int
Number of log entries to accumulate before an automatic flush.
Values < 1 disable buffering (flush every call).
"""
global _INFO_LOG_BATCH_SIZE
if batch_size < 1:
_INFO_LOG_BATCH_SIZE = 1
else:
_INFO_LOG_BATCH_SIZE = batch_size
def flush_info_logger_buffer(force: bool = True) -> None:
"""Flush any buffered info/error log messages immediately.
Parameters
----------
force : bool, default True
Present for future extensibility; currently ignored (always flushes).
"""
global _INFO_LOG_BUFFER
if not _INFO_LOG_BUFFER:
return
# Emit each buffered message using underlying logger functions directly
for level, msg in _INFO_LOG_BUFFER:
try:
if level == "error":
try:
log_error(msg)
except Exception as buffered_error_emit_exception:
print(msg, file=sys.stderr)
else:
try:
log_message(msg)
except Exception as buffered_message_emit_exception:
print(msg)
except Exception as buffered_flush_loop_exception:
# Suppress any failure in flush loop, continue with remaining entries
pass
_INFO_LOG_BUFFER = []
# Section: Logging & Helper Functions
def info_logger(
prefix: str,
exception: Optional[BaseException] = None,
level: str = "error",
include_trace: bool = False,
force_flush: bool = False,
) -> None:
"""Unified logger for messages and exceptions.
This helper formats a message with an optional exception, includes the
exception class name when provided, and delegates to generic logging
helpers from the base module (``log_message``/``log_error``). When
``include_trace`` is True and an exception is given, a traceback is also
emitted.
Parameters
----------
prefix : str
Human-readable message prefix for the log line.
exception : BaseException or None
Optional exception instance. If ``None``, only ``prefix`` is logged.
level : {'error', 'message'}, default 'error'
Logging level. ``'error'`` routes to ``log_error``; otherwise to
``log_message``.
include_trace : bool, default False
When ``True`` and ``exception`` is not ``None``, include a formatted
traceback after the primary log message.
Parameters
----------
force_flush : bool, default False
When True, forces an immediate flush of the buffered log messages
(including the current one) regardless of batch size.
Returns
-------
None
Notes
-----
If the underlying logging helpers fail (e.g., misconfigured), this
function falls back to printing to stdout/stderr to avoid silent loss. When
an exception is provided, messages are formatted as
``"{prefix} [<ExceptionClass>]: {exception}"``.
"""
if exception is None:
message = str(prefix)
else:
try:
name = getattr(exception, "__class__", type(exception)).__name__
except Exception as exception_name_introspection_exception:
name = "Exception"
message = f"{prefix} [{name}]: {exception}"
# Prepare trace (as separate buffered messages) if requested
trace_lines: List[str] = []
if include_trace and exception is not None:
try:
trace = "".join(
traceback.format_exception(
type(exception), exception, exception.__traceback__
)
)
trace_lines.append("[TRACE]\n" + trace)
except Exception as trace_format_exception:
pass
# Buffer message
try:
_INFO_LOG_BUFFER.append((level, message))
for tr in trace_lines:
_INFO_LOG_BUFFER.append(("message", tr))
# Decide flush
if (
force_flush
or _INFO_LOG_BATCH_SIZE <= 1
or len(_INFO_LOG_BUFFER) >= _INFO_LOG_BATCH_SIZE
):
flush_info_logger_buffer(force=True)
except Exception as info_logger_buffer_append_exception:
# On any buffering failure, attempt direct emission
try:
if level == "error":
log_error(message)
else:
log_message(message)
except Exception as info_logger_direct_emit_exception:
try:
print(message)
except Exception as info_logger_print_fallback_exception:
pass
def _terminate_all_child_processes() -> None:
"""Attempt to terminate all child processes of the current process.
Uses ``psutil``, if available, to iterate over child processes recursively
and call ``terminate()`` on each. Errors are suppressed, as this is
typically invoked during shutdown.
Returns
-------
None
Notes
-----
This is best-effort and does not guarantee exit. Callers may follow with
stronger measures (e.g., ``kill``) after a brief grace period.
"""
try:
import psutil
except Exception as psutil_import_exception:
return
try:
current_process = psutil.Process()
for child in current_process.children(recursive=True):
try:
child.terminate()
except Exception as child_terminate_exception:
pass
except Exception as psutil_process_iter_exception:
pass
def round_extrema(value: float | int, direction: str) -> float:
"""
Round an extrema value up or down to a visually clean axis limit.
This function is used to make plot axis extrema (min/max) more visually appealing
and consistent by rounding them to the nearest significant digit in the specified direction.
For example, 1234 rounded up becomes 1300, and 0.0123 rounded down becomes 0.012.
Parameters
----------
value : float or int
The extrema value to round. If zero, returns 0.0.
direction : {'up', 'down'}
The direction to round:
- 'up': round up to the next clean value (for maxima)
- 'down': round down to the previous clean value (for minima)
Returns
-------
float
The rounded extrema value.
Raises
------
ValueError
If an invalid direction is provided.
Examples
--------
>>> round_extrema(1234, 'up')
1300.0
>>> round_extrema(1234, 'down')
1200.0
>>> round_extrema(0.0123, 'up')
0.013
>>> round_extrema(0.0123, 'down')
0.012
"""
# Special case: zero should always round to zero
if value == 0:
return 0.0
# Determine the rounding factor based on the order of magnitude
factor = 10 ** (math.floor(math.log10(abs(value))) - 1)
# Round up for maxima, down for minima
if direction == "up":
# Use math.ceil to ensure we always round up
return float(math.ceil(value / factor) * factor)
elif direction == "down":
# Use math.floor to ensure we always round down
return float(math.floor(value / factor) * factor)
else:
# Raise an error for invalid direction arguments
raise ValueError(f"Invalid direction: {direction}")
def FAST_plot_pitch_angle_grid(
cdf_file_path: str,
filtered_orbits_df=None,
orbit_number: Optional[int] = None,
zoom_duration_minutes: float = 6.25,
scale_function_y: str = "linear",
scale_function_z: str = "linear",
pitch_angle_categories: Optional[Dict[str, List[Tuple[float, float]]]] = None,
show: bool = True,
colormap: str = "viridis",
y_min: Optional[float] = None,
y_max: Optional[float] = None,
z_min: Optional[float] = None,
z_max: Optional[float] = None,
) -> Tuple[Any, Any]:
"""Plot a grid of ESA spectrograms collapsed by pitch-angle categories.
Each row corresponds to a pitch-angle category (e.g., downgoing, upgoing,
perpendicular, all). If orbit boundary timestamps are available for this
instrument/orbit, a zoom column is added. Data are loaded from the CDF,
oriented so that energy is the y-axis, and collapsed over pitch-angle via
``FAST_COLLAPSE_FUNCTION`` (``np.nansum`` by default).
Parameters
----------
cdf_file_path : str
Path to the instrument CDF file.
filtered_orbits_df : pandas.DataFrame or None
DataFrame used to compute vertical lines for the ``orbit_number``.
If ``None``, vertical lines are omitted.
orbit_number : int or None
Orbit number used to compute and label vertical lines.
zoom_duration_minutes : float, default 6.25
Window length (minutes) for the optional zoom column.
scale_function_y : {'linear', 'log'}, default 'linear'
Y-axis scaling for the spectrogram.
scale_function_z : {'linear', 'log'}, default 'linear'
Color scale for the spectrogram intensity.
pitch_angle_categories : dict or None
Mapping of label -> list of (min_deg, max_deg) ranges; if ``None``,
defaults to standard four groups.
show : bool, default True
If ``True``, display the plot; otherwise render off-screen.
colormap : str, default 'viridis'
Matplotlib colormap name.
y_min, y_max : float or None, optional
Optional explicit energy (y-axis) limits override. When ``None`` the
default lower bound (0) and observed upper bound (<=4000) subset is
used. These overrides are typically provided by precomputed global
extrema.
z_min, z_max : float or None, optional
Optional explicit color (intensity) scale limits. When ``None`` the
1st / 99th percentiles per row are used (robust to outliers). When
provided (e.g., global extrema), they are applied uniformly across
rows.
Returns
-------
tuple[Figure or None, FigureCanvasBase or None]
Figure and Canvas for the grid, or ``(None, None)`` if no datasets.
Notes
-----
- Energy bins are filtered to ``[0, 4000]`` (or explicit ``y_min`` / ``y_max``).
- ``vmin``/``vmax`` (row color bounds) are derived from 1st/99th percentiles
unless explicit ``z_min`` / ``z_max`` provided.
- Each dataset row includes ``y_label='Energy (eV)'`` and ``z_label='Counts'``;
modify after return if alternative units are desired.
- When no pitch-angle category yields data, the function logs a message and
returns ``(None, None)``.
"""
# TODO: record orbits when error contains "is not a CDF file or a non-supported CDF!" in json log file
if pitch_angle_categories is None:
pitch_angle_categories = {
"downgoing\n(0, 30), (330, 360)": [(0, 30), (330, 360)],
"upgoing\n(150, 210)": [(150, 210)],
"perpendicular\n(40, 140), (210, 330)": [(40, 140), (210, 330)],
"all\n(0, 360)": [(0, 360)],
}
instrument_type = get_cdf_file_type(cdf_file_path)
cdf_file = cdflib.CDF(cdf_file_path)
times = np.asarray(cdf_file.varget(CDF_VARIABLES[0]))
data = np.asarray(cdf_file.varget(CDF_VARIABLES[1]))
energy_full = np.asarray(cdf_file.varget(CDF_VARIABLES[2]))
pitchangle_full = np.asarray(cdf_file.varget(CDF_VARIABLES[3]))
energy = energy_full[0, 0, :] if energy_full.ndim == 3 else energy_full
pitchangle = (
pitchangle_full[0, :, 0] if pitchangle_full.ndim == 3 else pitchangle_full
)
if data.shape[1] == len(energy) and data.shape[2] == len(pitchangle):
data = np.transpose(data, (0, 2, 1))
vertical_lines = None
if filtered_orbits_df is not None and orbit_number is not None:
vertical_lines = get_timestamps_for_orbit(
filtered_orbits_df, orbit_number, instrument_type, times
)
if (vertical_lines is None) or (len(vertical_lines) == 0):
info_logger(
f"No vertical lines found for orbit {orbit_number} in {cdf_file_path}. Skipping.",
level="message",
)
pa_keys = [
"all\n(0, 360)",
"downgoing\n(0, 30), (330, 360)",
"upgoing\n(150, 210)",
"perpendicular\n(40, 140), (210, 330)",
]
datasets = []
for key in pa_keys:
mask = np.zeros_like(pitchangle, dtype=bool)
for rng in pitch_angle_categories[key]:
mask |= (pitchangle >= rng[0]) & (pitchangle <= rng[1])
pa_data = data[:, mask, :]
matrix_full = FAST_COLLAPSE_FUNCTION(pa_data, axis=1)
nan_col_mask = ~np.all(np.isnan(matrix_full), axis=0)
# Apply energy (y-axis) limits; default 0-4000 if not overridden
y_lower = 0 if y_min is None else y_min
y_upper = 4000 if y_max is None else y_max
valid_energy_mask = (energy >= y_lower) & (energy <= y_upper)
combined_mask = nan_col_mask & valid_energy_mask
matrix_full = matrix_full[:, combined_mask]
matrix_full_plot = matrix_full.T
if matrix_full_plot.size == 0:
continue
# Color (z-axis) min/max percentiles unless overridden
if z_min is None:
vmin = np.nanpercentile(matrix_full_plot, 1)
else:
vmin = z_min
if z_max is None:
vmax = np.nanpercentile(matrix_full_plot, 99)
else:
vmax = z_max
# Include per-row y/z overrides so downstream generic grid honors them
datasets.append(
{
"x": times,
"y": energy,
"data": pa_data,
"label": key.title(),
"y_label": "Energy (eV)",
"z_label": "Counts",
"vmin": vmin, # color range (row-specific)
"vmax": vmax,
"y_min": y_lower,
"y_max": y_upper,
# z_min/z_max are not repeated unless provided explicitly to avoid
# forcing identical bounds when percentile scaling applied
**({"z_min": z_min} if z_min is not None else {}),
**({"z_max": z_max} if z_max is not None else {}),
}
)
if not datasets:
info_logger(
f"[WARNING] No pitch angle datasets to plot for {cdf_file_path}.",
level="message",
)
return None, None
title = f"Orbit {orbit_number} - Pitch Angle {instrument_type} ESA Spectrograms"
return generic_plot_multirow_optional_zoom(
datasets,
vertical_lines=vertical_lines,
zoom_duration_minutes=zoom_duration_minutes,
y_scale=scale_function_y,
z_scale=scale_function_z,
colormap=colormap,
show=show,
title=title,
row_label_pad=50,
row_label_rotation=90,
y_min=y_min,
y_max=y_max,
z_min=z_min,
z_max=z_max,
)
def FAST_plot_instrument_grid(
cdf_file_paths: Dict[str, str],
filtered_orbits_df=None,
orbit_number: Optional[int] = None,
zoom_duration_minutes: float = 6.25,
scale_function_y: str = "linear",
scale_function_z: str = "linear",
instrument_order: Tuple[str, ...] = ("ees", "eeb", "ies", "ieb"),
show: bool = True,
colormap: str = "viridis",
y_min: Optional[float] = None,
y_max: Optional[float] = None,
z_min: Optional[float] = None,
z_max: Optional[float] = None,
global_extrema: Optional[Dict[str, Union[int, float]]] = None,
) -> Tuple[Any, Any]:
"""Plot a multi-instrument ESA spectrogram grid for a single orbit.
Loads each instrument CDF, orients and filters the data, collapses across
pitch-angle, and constructs datasets for
``generic_plot_multirow_optional_zoom``. When vertical lines are available
for the orbit, a zoom column is included.
Parameters
----------
cdf_file_paths : dict of {str: str}
Mapping of instrument key (``'ees'``, ``'eeb'``, ``'ies'``, ``'ieb'``)
to CDF file path. Missing instruments are skipped.
filtered_orbits_df : pandas.DataFrame or None
DataFrame for vertical line computation; if ``None``, lines are omitted.
orbit_number : int or None
Orbit identifier used in titles/lines.
zoom_duration_minutes : float, default 6.25
Zoom window length (minutes).
scale_function_y : {'linear', 'log'}, default 'linear'
Y-axis scaling.
scale_function_z : {'linear', 'log'}, default 'linear'
Color scale for intensity.
instrument_order : tuple of str, default ('ees', 'eeb', 'ies', 'ieb')
Display order of instrument rows.
show : bool, default True
Whether to show the figure interactively.
colormap : str, default 'viridis'
Matplotlib colormap name.
y_min, y_max, z_min, z_max : float or None, optional
Global fallback overrides for axis/color limits. Per-instrument
overrides from ``global_extrema`` take precedence; finally row-level
percentile scaling is used when neither is provided.
global_extrema : dict or None
Mapping containing precomputed extrema keys (``{instrument}_{y_scale}_{z_scale}_{axis}_{min|max}``) used
to supply per-instrument (row-specific) limits. This enables distinct
y/z ranges for ``ees``, ``eeb``, ``ies``, and ``ieb`` within the same
figure, improving contrast when dynamic ranges differ.
y_min, y_max : float or None, optional
Direct energy bounds override applied when ``global_extrema`` is not
provided. Ignored per-instrument when ``global_extrema`` supplies
instrument-specific keys.
z_min, z_max : float or None, optional
Direct intensity scale overrides (see above re: ``global_extrema``).
global_extrema : dict or None, optional
Mapping containing precomputed extrema keys of the form
``{instrument}_{y_scale}_{z_scale}_{axis}_{min|max}``. When present
these take precedence over ``y_min`` / ``y_max`` / ``z_min`` /
``z_max``.
Returns
-------
tuple[Figure or None, FigureCanvasBase or None]
Figure and Canvas, or ``(None, None)`` if no datasets.
Notes
-----
- Files that fail to load are logged and skipped; remaining instruments may
still render.
- Energy bins are restricted to ``[0, 4000]`` (or overridden via ``global_extrema``
or explicit ``y_min`` / ``y_max``).
- ``vmin``/``vmax`` per row use 1st/99th percentiles for robust scaling unless
``global_extrema`` provides per-instrument ``z_min`` / ``z_max``.
- Each dataset row sets ``y_label='Energy (eV)'`` and ``z_label='Counts'`` by
default for clarity of physical units.
"""
datasets = []
vertical_lines = None
first_times = None
for inst in instrument_order:
cdf_path = cdf_file_paths.get(inst)
if not cdf_path:
continue
try:
cdf_file = cdflib.CDF(cdf_path)
times = np.asarray(cdf_file.varget(CDF_VARIABLES[0]))
data = np.asarray(cdf_file.varget(CDF_VARIABLES[1]))
energy_full = np.asarray(cdf_file.varget(CDF_VARIABLES[2]))
energy = energy_full[0, 0, :] if energy_full.ndim == 3 else energy_full
pitchangle_full = np.asarray(cdf_file.varget(CDF_VARIABLES[3]))
pitchangle = (
pitchangle_full[0, :, 0]
if pitchangle_full.ndim == 3
else pitchangle_full
)
if data.shape[1] == len(energy) and data.shape[2] == len(pitchangle):
data = np.transpose(data, (0, 2, 1))
if first_times is None:
first_times = times
if (
vertical_lines is None
and filtered_orbits_df is not None
and orbit_number is not None
):
instrument_type = get_cdf_file_type(cdf_path)
vertical_lines = get_timestamps_for_orbit(
filtered_orbits_df, orbit_number, instrument_type, times
)
if (vertical_lines is None) or (len(vertical_lines) == 0):
info_logger(
f"No vertical lines found for orbit {orbit_number} in {cdf_path}. Skipping.",
level="message",
)
matrix_full = FAST_COLLAPSE_FUNCTION(data, axis=1)
nan_col_mask = ~np.all(np.isnan(matrix_full), axis=0)
# Determine instrument-specific y bounds (energy)
if isinstance(global_extrema, dict):
key_prefix = f"{inst}_{scale_function_y}_{scale_function_z}"
y_lower = global_extrema.get(
f"{key_prefix}_y_min", 0 if y_min is None else y_min
)
y_upper = global_extrema.get(
f"{key_prefix}_y_max", 4000 if y_max is None else y_max
)
else:
y_lower = 0 if y_min is None else y_min
y_upper = 4000 if y_max is None else y_max
valid_energy_mask = (energy >= y_lower) & (energy <= y_upper)
combined_mask = nan_col_mask & valid_energy_mask
matrix_full = matrix_full[:, combined_mask]
matrix_full_plot = matrix_full.T
if matrix_full_plot.size == 0:
continue
# Determine instrument-specific z bounds (intensity)
if isinstance(global_extrema, dict):
key_prefix = f"{inst}_{scale_function_y}_{scale_function_z}"
vmin = global_extrema.get(f"{key_prefix}_z_min")
vmax = global_extrema.get(f"{key_prefix}_z_max")
if vmin is None:
vmin = np.nanpercentile(matrix_full_plot, 1)
if vmax is None:
vmax = np.nanpercentile(matrix_full_plot, 99)
else:
if z_min is None:
vmin = np.nanpercentile(matrix_full_plot, 1)
else:
vmin = z_min
if z_max is None:
vmax = np.nanpercentile(matrix_full_plot, 99)
else:
vmax = z_max
# Provide per-row overrides so generic multi-row plot can honor
# distinct instrument ranges for both y (energy) and z (intensity).
datasets.append(
{
"x": times,
"y": energy,
"data": data,
"label": inst.upper(),
"y_label": "Energy (eV)",
"z_label": "Counts",
"vmin": vmin,
"vmax": vmax,
"y_min": y_lower,
"y_max": y_upper,
# Only include z_min/z_max if explicitly fixed by global extrema
**({"z_min": z_min} if z_min is not None else {}),
**({"z_max": z_max} if z_max is not None else {}),
}
)
except Exception as file_load_failure:
info_logger(
f"Failed to load CDF for {inst} at {cdf_path}. Skipping.",
file_load_failure,
level="error",
)
continue
if not datasets:
return None, None
title = f"Orbit {orbit_number} - ESA Spectrograms"
return generic_plot_multirow_optional_zoom(
datasets,
vertical_lines=vertical_lines,
zoom_duration_minutes=zoom_duration_minutes,
y_scale=scale_function_y,
z_scale=scale_function_z,
colormap=colormap,
show=show,
title=title,
row_label_pad=50,
row_label_rotation=90,
y_min=y_min,
y_max=y_max,
z_min=z_min,
z_max=z_max,
)
# (Residual code from previous implementation removed in refactor.)
def FAST_process_single_orbit(
orbit_number: int,
instrument_file_paths: Dict[str, str],
filtered_orbits_dataframe,
zoom_duration_minutes: float,
y_axis_scale: str,
z_axis_scale: str,
instrument_order: Tuple[str, ...],
colormap: str,
output_base_directory: str,
orbit_timeout_seconds: Union[int, float] = 60,
instrument_timeout_seconds: Union[int, float] = 30,
global_extrema: Optional[Dict[str, Union[int, float]]] = None,
) -> Dict[str, Any]:
"""Process all plots for a single orbit with timeouts and deferred saving.
For each available instrument, renders a pitch-angle grid, then renders a
combined instrument grid. Figures are accumulated in memory and saved only
if no timeout thresholds are exceeded.
Parameters
----------
orbit_number : int
The orbit identifier.
instrument_file_paths : dict of {str: str}
Mapping of instrument key to CDF file path.
filtered_orbits_dataframe : pandas.DataFrame
DataFrame used to compute orbit boundary timestamps.
zoom_duration_minutes : float
Zoom window length for zoomed plots.
y_axis_scale : {'linear', 'log'}
Y-axis scaling.
z_axis_scale : {'linear', 'log'}
Color scale for intensity.
instrument_order : tuple of str
Order used in the instrument grid.
colormap : str
Matplotlib colormap.
output_base_directory : str
Root folder for saving figures; year/month are inferred from the CDF
path when possible, else ``'unknown'``.
orbit_timeout_seconds : int or float, default 60
Maximum wall-clock seconds permitted for the entire orbit processing (summed).
instrument_timeout_seconds : int or float, default 30
Per-instrument rendering timeout; exceeded instruments are skipped and noted.
global_extrema : dict or None
Precomputed extrema mapping used to supply uniform axis limits to all
instrument plots for deterministic scaling; produced by
``compute_global_extrema``.
orbit_timeout_seconds : int or float, default 60
Max total time for this orbit; if exceeded, status becomes ``'timeout'``
and no figures are saved.
instrument_timeout_seconds : int or float, default 30
Max time per instrument (and for the instrument grid). Exceeding this
aborts the orbit without saving.
global_extrema : dict or None, optional
Precomputed extrema dictionary (see ``compute_global_extrema``) used
to supply consistent per-instrument axis limits across all orbits.
Returns
-------
dict
Result dictionary with keys:
``orbit`` (int), ``status`` (``'ok'``, ``'error'``, or ``'timeout'``),
``errors`` (list of str). On timeout, includes ``timeout_type`` and
``timeout_instrument`` when applicable.
Notes
-----
- Timing diagnostics are logged per instrument and for the grid.
- Exceptions during plotting/saving are logged; processing continues when
safe. Figures are closed in all cases to free memory.
"""
result = {"orbit": orbit_number, "status": "ok", "errors": []}
orbit_start_time = _time.time()
pending_figures = [] # defer saving until timeouts cleared
timeout_triggered = False
timeout_type = None
timeout_instrument = None
try:
# Derive year/month for output path
year = "unknown"
month = "unknown"
first_path = next(
(
instrument_file_paths[k]
for k in ("ees", "eeb", "ies", "ieb")
if k in instrument_file_paths
),
None,
)
if first_path:
try:
parts = Path(first_path).parts
for i, part in enumerate(parts):
if part.isdigit() and len(part) == 4:
year = part
if (
i + 1 < len(parts)
and parts[i + 1].isdigit()
and len(parts[i + 1]) == 2
):
month = parts[i + 1]
break
except Exception as year_month_parse_exception:
info_logger(
"[WARN] Could not parse year/month",
year_month_parse_exception,
level="message",
)
output_dir = os.path.join(
output_base_directory, str(year), str(month), str(orbit_number)
)
os.makedirs(output_dir, exist_ok=True)
# Per-instrument processing
for inst_type in ("ees", "eeb", "ies", "ieb"):
if timeout_triggered:
break
cdf_path = instrument_file_paths.get(inst_type)
if not cdf_path:
continue
inst_start = _time.time()
try:
inst_detected = get_cdf_file_type(cdf_path)
if inst_detected is None or inst_detected == "orb":
continue
cdf_obj = cdflib.CDF(cdf_path)
time_unix_array = np.asarray(cdf_obj.varget("time_unix"))
vertical_lines = get_timestamps_for_orbit(
filtered_orbits_dataframe,
orbit_number,
inst_detected,
time_unix_array,
)
# Lookup global extrema overrides
y_min_override = None
y_max_override = None
z_min_override = None
z_max_override = None
if isinstance(global_extrema, dict):
key_base = f"{inst_detected}_{y_axis_scale}_{z_axis_scale}"
y_min_raw = global_extrema.get(f"{key_base}_y_min")
y_max_raw = global_extrema.get(f"{key_base}_y_max")
z_min_raw = global_extrema.get(f"{key_base}_z_min")
z_max_raw = global_extrema.get(f"{key_base}_z_max")
y_min_override = (
round_extrema(y_min_raw, "down")
if y_min_raw is not None
else None
)
y_max_override = (
round_extrema(y_max_raw, "up")
if y_max_raw is not None
else None
)
z_min_override = (
round_extrema(z_min_raw, "down")
if z_min_raw is not None
else None
)
z_max_override = (
round_extrema(z_max_raw, "up")
if z_max_raw is not None
else None
)
fig_pa, canvas_pa = FAST_plot_pitch_angle_grid(
cdf_path,
filtered_orbits_df=filtered_orbits_dataframe,
orbit_number=orbit_number,
zoom_duration_minutes=zoom_duration_minutes,
scale_function_y=y_axis_scale,
scale_function_z=z_axis_scale,
show=False,
colormap=colormap,
y_min=y_min_override,
y_max=y_max_override,
z_min=z_min_override,
z_max=z_max_override,
)
if fig_pa is not None:
cusp = vertical_lines is not None and len(vertical_lines) > 0
filename = f"{orbit_number}{'_cusp' if cusp else ''}_pitch-angle_ESA_{inst_detected}_y-{y_axis_scale}_z-{z_axis_scale}.png"
pending_figures.append(
{
"figure": fig_pa,
"canvas": canvas_pa,
"path": os.path.join(output_dir, filename),
"desc": f"pitch-angle {inst_detected}",
}
)
except Exception as instrument_exception:
err = f"[FAIL] Plotting Orbit {orbit_number} pitch angle grid for {inst_type}"
info_logger(err, instrument_exception, level="error")
result["status"] = "error"
result.setdefault("errors", []).append(err)
continue
finally:
inst_elapsed = _time.time() - inst_start
info_logger(
f"[TIMING] Orbit {orbit_number} instrument {inst_type} elapsed {inst_elapsed:.3f}s",
level="message",
)
if inst_elapsed > instrument_timeout_seconds and not timeout_triggered:
timeout_triggered = True
timeout_type = "instrument"
timeout_instrument = inst_type
info_logger(
f"[TIMEOUT] Instrument {inst_type} in orbit {orbit_number} exceeded {instrument_timeout_seconds:.0f}s ({inst_elapsed:.2f}s). Aborting orbit without saving.",
level="message",
)
break
# Instrument grid (if still OK)
grid_elapsed = None
if not timeout_triggered:
grid_start = _time.time()
try:
# Provide global_extrema dict so per-instrument limits are applied inside grid helper
fig_grid, canvas_grid = FAST_plot_instrument_grid(
instrument_file_paths,
filtered_orbits_df=filtered_orbits_dataframe,
orbit_number=orbit_number,
zoom_duration_minutes=zoom_duration_minutes,
scale_function_y=y_axis_scale,
scale_function_z=z_axis_scale,
instrument_order=instrument_order,
show=False,
colormap=colormap,
global_extrema=global_extrema,
)
if fig_grid is not None:
pending_figures.append(
{
"figure": fig_grid,
"canvas": canvas_grid,
"path": os.path.join(
output_dir,
f"{orbit_number}_instrument-grid_ESA_y-{y_axis_scale}_z-{z_axis_scale}.png",
),
"desc": "instrument-grid",
}
)
except Exception as instrument_grid_exception:
err = f"[FAIL] Plotting Orbit {orbit_number} instrument grid"
info_logger(err, instrument_grid_exception, level="error")
result["status"] = "error"
result.setdefault("errors", []).append(err)
finally:
grid_elapsed = _time.time() - grid_start
info_logger(
f"[TIMING] Orbit {orbit_number} instrument-grid elapsed {grid_elapsed:.3f}s",
level="message",
)
if (
grid_elapsed is not None
and grid_elapsed > instrument_timeout_seconds
and not timeout_triggered
):
timeout_triggered = True
timeout_type = "instrument"
timeout_instrument = "instrument_grid"
info_logger(
f"[TIMEOUT] Instrument grid in orbit {orbit_number} exceeded {instrument_timeout_seconds:.0f}s ({grid_elapsed:.2f}s). Aborting orbit without saving.",
level="message",
)
# Orbit total timeout check
orbit_elapsed = _time.time() - orbit_start_time
if orbit_elapsed > orbit_timeout_seconds and not timeout_triggered:
timeout_triggered = True
timeout_type = "orbit"
info_logger(
f"[TIMEOUT] Orbit {orbit_number} exceeded {orbit_timeout_seconds:.0f}s total ({orbit_elapsed:.2f}s). Aborting without saving.",
level="message",
)
# If timeout -> discard pending figures
if timeout_triggered:
for fig_item in pending_figures:
try:
close_all_axes_and_clear(fig_item["figure"])
except Exception as pitch_angle_energy_transpose_exception:
pass
pending_figures.clear()
result["status"] = "timeout"
result["timeout_type"] = timeout_type
if timeout_instrument:
result["timeout_instrument"] = timeout_instrument
return result
# Save figures now
for fig_item in pending_figures:
fig = fig_item["figure"]
fpath = fig_item["path"]
try:
info_logger(