Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarking/baselines/inspect_prepost_feature_ablation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def inspect_prepost_feature_ablation() -> pd.DataFrame:
wtg_numbers=DEFAULT_WTG_NUMBERS,
wtg_names=DEFAULT_TURBINE_SUBSET,
)
_rep, mi, truth = _pin_case(
_rep, mi, truth, _window = _pin_case(
scada_df,
study=study,
profile_name=DEFAULT_PROFILE,
Expand Down
84 changes: 80 additions & 4 deletions benchmarking/baselines/inspect_prepost_hard_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@
from benchmarking.baselines.power_model import PowerModelMethod
from benchmarking.baselines.v0_binned import V0BinnedMethod
from benchmarking.harness import (
CONDITION_BINS,
CampaignWindow,
Method,
MethodInput,
MethodOutput,
StudyConfig,
build_replicates,
campaign_windows,
plot_conditional_uplift,
treated_activity_mask,
window_row_mask,
)
Expand Down Expand Up @@ -106,8 +110,8 @@ def _select_replicate(replicates: list[Replicate], test_wtg: str) -> Replicate:

def _pin_case(
scada_df: pd.DataFrame, *, study: StudyConfig, profile_name: str, test_wtg: str, campaign_months: int
) -> tuple[Replicate, MethodInput, float]:
"""Build the pinned replicate, its shared ``MethodInput`` and the ground-truth uplift."""
) -> tuple[Replicate, MethodInput, float, CampaignWindow]:
"""Build the pinned replicate, its shared ``MethodInput``, the ground-truth uplift, and the window."""
profile = overnight_profiles()[profile_name]
replicates = build_replicates(scada_df, profile=profile, study=study)
rep = _select_replicate(replicates, test_wtg)
Expand Down Expand Up @@ -145,7 +149,7 @@ def _pin_case(
window.activity_end,
100 * truth,
)
return rep, mi, truth
return rep, mi, truth, window


def _build_methods(out_dir: Path, *, include_v0: bool) -> list[Method]:
Expand All @@ -161,6 +165,7 @@ def _build_methods(out_dir: Path, *, include_v0: bool) -> list[Method]:
PowerModelMethod(
active_power_col=HOT_COLUMNS.active_power,
wind_speed_col=HOT_COLUMNS.wind_speed,
wind_speed_sd_col=HOT_COLUMNS.wind_speed_sd,
availability_col=HOT_COLUMNS.availability,
era5_hourly_df=context.reanalysis_datasets[0].data,
out_dir=out_dir / "power_model",
Expand Down Expand Up @@ -199,6 +204,75 @@ def _run_methods(methods: list[Method], *, mi: MethodInput, truth: float) -> pd.
return pd.DataFrame(rows)


def _plot_conditional_uplift(
rep: Replicate, mi: MethodInput, window: CampaignWindow, *, out_dir: Path, profile_name: str
) -> None:
"""Re-run the power model on ``mi`` and write per-condition uplift-vs-truth plots."""
context = build_hot_v0_context(wtg_names=DEFAULT_TURBINE_SUBSET)
power_model = PowerModelMethod(
active_power_col=HOT_COLUMNS.active_power,
wind_speed_col=HOT_COLUMNS.wind_speed,
wind_speed_sd_col=HOT_COLUMNS.wind_speed_sd,
availability_col=HOT_COLUMNS.availability,
era5_hourly_df=context.reanalysis_datasets[0].data,
out_dir=out_dir / "power_model_conditional",
)
pm_output = power_model.estimate(mi)
if pm_output.p50_by_condition is None:
logger.warning("power_model returned no p50_by_condition; skipping conditional uplift plots")
return

test_index = rep.synthetic_df.loc[rep.synthetic_df[HOT_COLUMNS.turbine] == rep.test_wtg].index
mask = treated_activity_mask(test_index, rep.upgrade_timing, window=window)
truth_by_condition = {
c: rep.true_uplift(mask=mask, by=c, bins=CONDITION_BINS[c]).by_condition for c in ("ws", "ti")
}
# Filter out conditions where true_uplift returned None (should not happen when bins given)
truth_by_condition_clean: dict[str, pd.DataFrame] = {
c: df for c, df in truth_by_condition.items() if df is not None
}
if not truth_by_condition_clean:
logger.warning("No per-condition truth available; skipping conditional uplift plots")
return

frame = conditional_truth_vs_estimate(pm_output, truth_by_condition_clean, method_name=power_model.name)
for c in truth_by_condition_clean:
plot_conditional_uplift(
frame,
condition=c,
save_path=out_dir / f"conditional_uplift_{c}.png",
title=f"Conditional uplift ({c}) — profile={profile_name}, wtg={mi.test_wtg}",
)
logger.info("Wrote conditional uplift plots to %s", out_dir)


def conditional_truth_vs_estimate(
output: MethodOutput, truth_by_condition: dict[str, pd.DataFrame], *, method_name: str
) -> pd.DataFrame:
"""Shape a method's p50_by_condition + per-condition truth into a plot_conditional_uplift frame."""
if output.p50_by_condition is None:
msg = "output.p50_by_condition must not be None"
raise ValueError(msg)
frames = []
bc = output.p50_by_condition
for condition, truth in truth_by_condition.items():
est = bc[bc["condition"] == condition].set_index("condition_bin")["p50_uplift"]
t = truth.assign(condition_bin=truth["condition_bin"].astype(str)).set_index("condition_bin")["true_uplift"]
bins = est.index.union(t.index)
frames.append(
pd.DataFrame(
{
"method": method_name,
"condition": condition,
"condition_bin": bins,
"mean_estimate": est.reindex(bins).to_numpy(),
"mean_truth": t.reindex(bins).to_numpy(),
}
)
)
return pd.concat(frames, ignore_index=True)


def inspect_prepost_hard_case(
*,
profile_name: str = DEFAULT_PROFILE,
Expand Down Expand Up @@ -228,12 +302,14 @@ def inspect_prepost_hard_case(
wtg_names=DEFAULT_TURBINE_SUBSET,
)

_rep, mi, truth = _pin_case(
rep, mi, truth, window = _pin_case(
scada_df, study=study, profile_name=profile_name, test_wtg=test_wtg, campaign_months=campaign_months
)
methods = _build_methods(out_dir, include_v0=include_v0)
summary = _run_methods(methods, mi=mi, truth=truth)

_plot_conditional_uplift(rep, mi, window, out_dir=out_dir, profile_name=profile_name)

summary_path = out_dir / "comparison_summary.csv"
summary.to_csv(summary_path, index=False)
logger.info("Wrote %s\n%s", summary_path, summary.to_string(index=False))
Expand Down
25 changes: 25 additions & 0 deletions benchmarking/baselines/power_model/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,31 @@ def reference_mean_wind_speed(
return pd.concat(cols, axis=1).mean(axis=1)


def test_condition_signals(
scada_df: pd.DataFrame,
*,
test_wtg: str,
turbine_col: str,
wind_speed_col: str,
wind_speed_sd_col: str | None,
) -> pd.DataFrame:
"""Test turbine's MEASURED ws and ti on the unique sorted index (post-treatment, accepted §3).

``ti`` is omitted when no SD column is configured.
"""
index = pd.DatetimeIndex(pd.unique(scada_df.index)).sort_values()
rows = scada_df[scada_df[turbine_col] == test_wtg]
ws = pd.Series(rows[wind_speed_col].to_numpy(dtype=float), index=pd.DatetimeIndex(rows.index))
ws = ws[~ws.index.duplicated()].reindex(index)
out = pd.DataFrame({"ws": ws})
if wind_speed_sd_col is not None and wind_speed_sd_col in scada_df.columns:
sd = pd.Series(rows[wind_speed_sd_col].to_numpy(dtype=float), index=pd.DatetimeIndex(rows.index))
sd = sd[~sd.index.duplicated()].reindex(index)
ws_arr = ws.to_numpy()
out["ti"] = np.divide(sd.to_numpy(), ws_arr, out=np.full(len(ws_arr), np.nan), where=ws_arr != 0)
return out


def check_reference_only(feature_names: list[str], *, test_wtg: str) -> None:
"""Raise if any feature is qualified with the test turbine (violating the §3 rule)."""
offenders = [f for f in feature_names if f.endswith(f"{QUALIFIER}{test_wtg}")]
Expand Down
31 changes: 30 additions & 1 deletion benchmarking/baselines/power_model/method.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
era5_feature_frame,
extract_outcome,
reference_mean_wind_speed,
test_condition_signals,
)
from benchmarking.baselines.rlearner.nuisance import make_outcome_model
from benchmarking.diagnostics import DiagnosticContext, write_common_diagnostics, write_run_config
from benchmarking.harness.conditions import CONDITION_BINS, energy_ratio_by_bin
from benchmarking.harness.method import MethodInput, MethodOutput
from benchmarking.synthetic import HOT_COLUMNS, ToggleSchedule, treated_mask

Expand Down Expand Up @@ -93,6 +95,7 @@ class PowerModelMethod:
active_power_col: str
availability_col: str
wind_speed_col: str | None = None
wind_speed_sd_col: str | None = None
era5_hourly_df: pd.DataFrame | None = None
columns: ColumnSchema = HOT_COLUMNS
name: str = "power_model"
Expand Down Expand Up @@ -148,6 +151,17 @@ def estimate(self, mi: MethodInput) -> MethodOutput:
sum_counter = float(fit["pred_upgraded"].sum())
uplift = sum_actual / sum_counter - 1.0 if np.isfinite(sum_counter) and sum_counter != 0 else float("nan")

by_condition: pd.DataFrame | None = None
if self.wind_speed_col is not None:
conditions = test_condition_signals(
scada,
test_wtg=mi.test_wtg,
turbine_col=mi.turbine_col,
wind_speed_col=self.wind_speed_col,
wind_speed_sd_col=self.wind_speed_sd_col,
)
by_condition = self._conditional_uplift(conditions, upgraded_sel=upgraded_sel, fit=fit)

self._write(
mi,
index=index,
Expand All @@ -164,7 +178,7 @@ def estimate(self, mi: MethodInput) -> MethodOutput:
n_refs=n_refs,
era5=era5,
)
return MethodOutput(p50_overall=uplift)
return MethodOutput(p50_overall=uplift, p50_by_condition=by_condition)

def _add_era5(
self,
Expand Down Expand Up @@ -234,6 +248,20 @@ def _fit_predict(
"pred_baseline_valid": pred_valid,
}

def _conditional_uplift(
self, conditions: pd.DataFrame, *, upgraded_sel: np.ndarray, fit: dict[str, Any]
) -> pd.DataFrame | None:
"""Reduce the upgraded actual/counterfactual ledger to per-bin energy-ratio uplift."""
cond_up = conditions.iloc[upgraded_sel]
actual = fit["y_upgraded"]
counterfactual = fit["pred_upgraded"]
frames = []
for name in [c for c in ("ws", "ti") if c in cond_up.columns]:
table = energy_ratio_by_bin(cond_up[name].to_numpy(), actual, counterfactual, bins=CONDITION_BINS[name])
table.insert(0, "condition", name)
frames.append(table[["condition", "condition_bin", "p50_uplift"]])
return pd.concat(frames, ignore_index=True) if frames else None

def _make_model(self) -> Any: # noqa: ANN401
"""Outcome model with ``seed`` plumbed into LightGBM's ``random_state`` (caller overrides win)."""
return make_outcome_model(**{"random_state": self.seed, **self.model_params})
Expand Down Expand Up @@ -361,6 +389,7 @@ def _config_params(self) -> dict[str, Any]:
"active_power_col": self.active_power_col,
"availability_col": self.availability_col,
"wind_speed_col": self.wind_speed_col,
"wind_speed_sd_col": self.wind_speed_sd_col,
"seed": self.seed,
"toggle_campaign_only": self.toggle_campaign_only,
"has_era5": self.era5_hourly_df is not None,
Expand Down
Loading