From 9eb3cf7bc9fc4f0db170ef06d70d0a35fc78f471 Mon Sep 17 00:00:00 2001 From: Vahid Tavakkoli Date: Thu, 26 Mar 2026 14:24:06 +0100 Subject: [PATCH] Reshape Madrid zoneI data into multi-frequency training features --- README.md | 7 ++ generate_data.py | 87 +++++++++++++- oran_sim/data.py | 187 +++++++++++++++++++++++------- scripts/train.py | 11 +- tests/test_madrid_zone_dataset.py | 86 ++++++++++++++ 5 files changed, 332 insertions(+), 46 deletions(-) create mode 100644 tests/test_madrid_zone_dataset.py diff --git a/README.md b/README.md index 5d34354..015275a 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,13 @@ This repository now supports end-to-end KPM processing with one consistent workf python generate_data.py --steps 5000 --input shared_data/dataset-kpm --output shared_data/traffic_data.csv ``` +Madrid LTE Zone I (frequencies `f796`, `f1815`, `f2650`) is also supported: +```bash +python generate_data.py --steps 5000 --input dataset/madrid-lte-dataset/zoneI --output shared_data/traffic_data.csv +``` +This produces aligned per-second features such as: +`downlink_f796`, `uplink_f796`, `users_f796`, `downlink_f1815`, `uplink_f1815`, `users_f1815`, `downlink_f2650`, `uplink_f2650`, `users_f2650`. + ### Train ```bash python -m scripts.train --csv shared_data/traffic_data.csv --out_dir results/model --seed 42 diff --git a/generate_data.py b/generate_data.py index 1566536..1f1646b 100644 --- a/generate_data.py +++ b/generate_data.py @@ -13,6 +13,83 @@ from oran_sim.splitting import chronological_split +def _is_madrid_zone_layout(root: Path) -> bool: + freq_dirs = [p for p in root.glob("f*") if p.is_dir()] + if not freq_dirs: + return False + return any(list(fd.glob("downlink_*.csv")) for fd in freq_dirs) + + +def _load_madrid_zone_wide(root: Path) -> pd.DataFrame: + freq_dirs = sorted([p for p in root.glob("f*") if p.is_dir()]) + rows = [] + per_freq = {} + + for freq_dir in freq_dirs: + freq = freq_dir.name + dl_path = next(iter(sorted(freq_dir.glob("downlink_*.csv"))), None) + ul_path = next(iter(sorted(freq_dir.glob("uplink_*.csv"))), None) + users_path = next(iter(sorted(freq_dir.glob("users_*.csv"))), None) + if dl_path is None: + continue + + dl = pd.read_csv(dl_path) + dl["second"] = np.floor(pd.to_numeric(dl["timestamp"], errors="coerce")).astype("Int64") + dl["tbs_sum"] = pd.to_numeric(dl["tbs_sum"], errors="coerce") + dl = dl.dropna(subset=["second"]).groupby("second", as_index=False)["tbs_sum"].sum() + dl = dl.rename(columns={"tbs_sum": f"downlink_{freq}"}) + + ul = pd.DataFrame(columns=["second", f"uplink_{freq}"]) + if ul_path is not None: + ul_tmp = pd.read_csv(ul_path) + ul_tmp["second"] = np.floor(pd.to_numeric(ul_tmp["timestamp"], errors="coerce")).astype("Int64") + ul_tmp["tbs_sum"] = pd.to_numeric(ul_tmp["tbs_sum"], errors="coerce") + ul_tmp = ul_tmp.dropna(subset=["second"]).groupby("second", as_index=False)["tbs_sum"].sum() + ul = ul_tmp.rename(columns={"tbs_sum": f"uplink_{freq}"}) + + users = pd.DataFrame(columns=["second", f"users_{freq}"]) + if users_path is not None: + user_tmp = pd.read_csv(users_path) + user_tmp["second"] = np.floor(pd.to_numeric(user_tmp["timestamp"], errors="coerce")).astype("Int64") + user_tmp["user_unique"] = pd.to_numeric(user_tmp["user_unique"], errors="coerce") + user_tmp = user_tmp.dropna(subset=["second"]).groupby("second", as_index=False)["user_unique"].mean() + users = user_tmp.rename(columns={"user_unique": f"users_{freq}"}) + + merged = dl.merge(ul, on="second", how="outer").merge(users, on="second", how="outer").sort_values("second") + per_freq[freq] = merged + rows.extend(merged["second"].dropna().astype(int).tolist()) + + if not per_freq: + raise RuntimeError(f"No usable frequency data found in {root}") + + all_seconds = pd.DataFrame({"second": sorted(set(rows))}) + base = all_seconds.copy() + for freq in sorted(per_freq.keys()): + base = base.merge(per_freq[freq], on="second", how="left") + + for c in base.columns: + if c != "second": + base[c] = pd.to_numeric(base[c], errors="coerce") + base = base.sort_values("second").reset_index(drop=True) + feature_cols = [c for c in base.columns if c != "second"] + base[feature_cols] = base[feature_cols].ffill().fillna(0.0) + + down_cols = [c for c in base.columns if c.startswith("downlink_f")] + up_cols = [c for c in base.columns if c.startswith("uplink_f")] + user_cols = [c for c in base.columns if c.startswith("users_f")] + + base["timestamp"] = base["second"].astype(float) + base["time_ms"] = (base["second"].astype(float) * 1000.0).astype("int64") + base["traffic_load"] = base[down_cols].sum(axis=1) if down_cols else 0.0 + base["num_ues"] = base[user_cols].sum(axis=1) if user_cols else 0.0 + base["ul_buffer_bytes"] = base[up_cols].sum(axis=1) if up_cols else 0.0 + base["dl_buffer_bytes"] = base["traffic_load"] + base["scheduling_policy"] = root.name + base["reservation"] = root.name + + return base + + def _build_target(df: pd.DataFrame, target: str, horizon_steps: int) -> pd.DataFrame: horizon = max(1, horizon_steps) shifted = df["traffic_load"].shift(-horizon) @@ -66,10 +143,16 @@ def main() -> None: args = parser.parse_args() required_rows = args.steps + max(1, int(args.horizon_steps)) - base = load_timeseries_from_kpm(args.input, n_steps=required_rows, verbose=True) + input_path = Path(args.input) + if _is_madrid_zone_layout(input_path): + base = _load_madrid_zone_wide(input_path) + else: + base = load_timeseries_from_kpm(args.input, n_steps=required_rows, verbose=True) base = _build_target(base, args.target, args.horizon_steps) - keep_cols = ["time_ms", "reservation", "traffic_load"] + FEATURE_ORDER + ["target"] + keep_cols = ["timestamp", "time_ms", "reservation", "traffic_load"] + keep_cols += sorted([c for c in base.columns if c.startswith("downlink_f") or c.startswith("uplink_f") or c.startswith("users_f")]) + keep_cols += FEATURE_ORDER + ["target"] for c in keep_cols: if c not in base.columns: base[c] = 0 diff --git a/oran_sim/data.py b/oran_sim/data.py index 4074765..e351b37 100644 --- a/oran_sim/data.py +++ b/oran_sim/data.py @@ -192,51 +192,87 @@ def _scenario_context(exp_dir: Path) -> Dict[str, object]: } -def load_timeseries_from_kpm(root_dir: str | Path, n_steps: Optional[int] = None, verbose: bool = True) -> pd.DataFrame: - root = Path(root_dir) - reservations = sorted(root.glob("**/RESERVATION-*")) - if not reservations: - raise FileNotFoundError(f"No RESERVATION-* directories found under {root}") +def _load_madrid_lte_zone(root: Path, n_steps: Optional[int], verbose: bool) -> pd.DataFrame: + freq_dirs = sorted([p for p in root.glob("f*") if p.is_dir()]) + if not freq_dirs: + raise FileNotFoundError(f"No frequency folders (f*) found under {root}") frames: List[pd.DataFrame] = [] remaining = n_steps total_used = 0 - for exp in reservations: + + for freq_dir in freq_dirs: if remaining is not None and remaining <= 0: break - bs = _load_bs_metrics(exp) - if bs.empty: + downlink_path = next(iter(sorted(freq_dir.glob("downlink_*.csv"))), None) + uplink_path = next(iter(sorted(freq_dir.glob("uplink_*.csv"))), None) + users_path = next(iter(sorted(freq_dir.glob("users_*.csv"))), None) + if downlink_path is None: continue - start = float(bs["time_ms"].min()) - enb = _load_enb_metrics(exp) - ue = _load_ue_metrics(exp, start) - flow = _load_flow_metrics(exp, start) - merged = bs.sort_values("time_ms").copy() - merged["time_ms"] = pd.to_numeric(merged["time_ms"], errors="coerce").astype(float) - - if not enb.empty: - enb["time_ms"] = pd.to_numeric(enb["time_ms"], errors="coerce").astype(float) - merged = pd.merge_asof( - merged.sort_values("time_ms"), - enb.sort_values("time_ms"), - on="time_ms", - direction="nearest", - tolerance=300, - suffixes=("", "_enb"), - ) - if not ue.empty: - ue["time_ms"] = pd.to_numeric(ue["time_ms"], errors="coerce").astype(float) - merged = pd.merge_asof(merged.sort_values("time_ms"), ue.sort_values("time_ms"), on="time_ms", direction="nearest", tolerance=300) - if not flow.empty: - flow["time_ms"] = pd.to_numeric(flow["time_ms"], errors="coerce").astype(float) - merged = pd.merge_asof(merged.sort_values("time_ms"), flow.sort_values("time_ms"), on="time_ms", direction="nearest", tolerance=1000) - ctx = _scenario_context(exp) - for k, v in ctx.items(): - merged[k] = v - - merged["reservation"] = exp.name + dl = _read_csv(downlink_path, prefer_sep=",") + if dl.empty: + continue + dl.columns = [normalize_column_name(c) for c in dl.columns] + if "time" not in dl.columns: + continue + dl["time"] = pd.to_numeric(dl["time"], errors="coerce") + dl["tbs_sum"] = pd.to_numeric(dl.get("tbs_sum"), errors="coerce") + dl = dl.dropna(subset=["time"]).sort_values("time") + dl["time_ms"] = dl["time"] * 1000.0 + dl["traffic_load"] = dl["tbs_sum"].fillna(0.0) + merged = dl[["time_ms", "traffic_load"]].copy() + + if uplink_path is not None: + ul = _read_csv(uplink_path, prefer_sep=",") + if not ul.empty: + ul.columns = [normalize_column_name(c) for c in ul.columns] + if "time" not in ul.columns: + ul = pd.DataFrame() + if not ul.empty: + ul["time"] = pd.to_numeric(ul["time"], errors="coerce") + ul["tbs_sum"] = pd.to_numeric(ul.get("tbs_sum"), errors="coerce") + if not ul.empty: + ul = ul.dropna(subset=["time"]).sort_values("time") + ul["time_ms"] = ul["time"] * 1000.0 + ul = ul[["time_ms", "tbs_sum"]].rename(columns={"tbs_sum": "ul_buffer_bytes"}) + merged = pd.merge_asof( + merged.sort_values("time_ms"), + ul.sort_values("time_ms"), + on="time_ms", + direction="nearest", + tolerance=1500.0, + ) + + if users_path is not None: + users = _read_csv(users_path, prefer_sep=",") + if not users.empty: + users.columns = [normalize_column_name(c) for c in users.columns] + if "time" not in users.columns: + users = pd.DataFrame() + if not users.empty: + users["time"] = pd.to_numeric(users["time"], errors="coerce") + users["user_unique"] = pd.to_numeric(users.get("user_unique"), errors="coerce") + if not users.empty: + users = users.dropna(subset=["time"]).sort_values("time") + users["time_ms"] = users["time"] * 1000.0 + users = users[["time_ms", "user_unique"]].rename(columns={"user_unique": "num_ues"}) + merged = pd.merge_asof( + merged.sort_values("time_ms"), + users.sort_values("time_ms"), + on="time_ms", + direction="nearest", + tolerance=1500.0, + ) + + freq_digits = re.findall(r"\d+", freq_dir.name) + freq_value = float(freq_digits[0]) if freq_digits else 0.0 + merged["reservation"] = freq_dir.name + merged["slicing_enabled"] = 0.0 + merged["slice_id"] = freq_value + merged["slice_prb"] = 0.0 + merged["scheduling_policy"] = root.name fetched_rows = len(merged) if remaining is None: @@ -251,17 +287,88 @@ def load_timeseries_from_kpm(root_dir: str | Path, n_steps: Optional[int] = None total_used += used_rows if verbose: print( - f"[DATA] reservation={exp.name} fetched_rows={fetched_rows} " + f"[DATA] reservation={freq_dir.name} fetched_rows={fetched_rows} " f"used_rows={used_rows} cumulative_used={total_used}" ) if not frames: - raise RuntimeError("No usable data parsed from reservations") + raise RuntimeError(f"No usable Madrid LTE data parsed from {root}") + + return pd.concat(frames, ignore_index=True, sort=False) - df = pd.concat(frames, ignore_index=True, sort=False) + +def load_timeseries_from_kpm(root_dir: str | Path, n_steps: Optional[int] = None, verbose: bool = True) -> pd.DataFrame: + root = Path(root_dir) + reservations = sorted(root.glob("**/RESERVATION-*")) + if not reservations: + df = _load_madrid_lte_zone(root, n_steps=n_steps, verbose=verbose) + else: + frames: List[pd.DataFrame] = [] + remaining = n_steps + total_used = 0 + for exp in reservations: + if remaining is not None and remaining <= 0: + break + + bs = _load_bs_metrics(exp) + if bs.empty: + continue + start = float(bs["time_ms"].min()) + enb = _load_enb_metrics(exp) + ue = _load_ue_metrics(exp, start) + flow = _load_flow_metrics(exp, start) + merged = bs.sort_values("time_ms").copy() + merged["time_ms"] = pd.to_numeric(merged["time_ms"], errors="coerce").astype(float) + + if not enb.empty: + enb["time_ms"] = pd.to_numeric(enb["time_ms"], errors="coerce").astype(float) + merged = pd.merge_asof( + merged.sort_values("time_ms"), + enb.sort_values("time_ms"), + on="time_ms", + direction="nearest", + tolerance=300, + suffixes=("", "_enb"), + ) + if not ue.empty: + ue["time_ms"] = pd.to_numeric(ue["time_ms"], errors="coerce").astype(float) + merged = pd.merge_asof(merged.sort_values("time_ms"), ue.sort_values("time_ms"), on="time_ms", direction="nearest", tolerance=300) + if not flow.empty: + flow["time_ms"] = pd.to_numeric(flow["time_ms"], errors="coerce").astype(float) + merged = pd.merge_asof(merged.sort_values("time_ms"), flow.sort_values("time_ms"), on="time_ms", direction="nearest", tolerance=1000) + + ctx = _scenario_context(exp) + for k, v in ctx.items(): + merged[k] = v + + merged["reservation"] = exp.name + + fetched_rows = len(merged) + if remaining is None: + used_rows = fetched_rows + to_append = merged + else: + used_rows = min(fetched_rows, max(remaining, 0)) + to_append = merged.iloc[:used_rows].copy() + remaining -= used_rows + + frames.append(to_append) + total_used += used_rows + if verbose: + print( + f"[DATA] reservation={exp.name} fetched_rows={fetched_rows} " + f"used_rows={used_rows} cumulative_used={total_used}" + ) + + if not frames: + raise RuntimeError("No usable data parsed from reservations") + + df = pd.concat(frames, ignore_index=True, sort=False) df.columns = [normalize_column_name(c) for c in df.columns] - if "tx_brate_dl_mbps" in df.columns: + if "traffic_load" in df.columns: + df["traffic_load"] = pd.to_numeric(df["traffic_load"], errors="coerce") + elif "tx_brate_dl_mbps" in df.columns: df["traffic_load"] = pd.to_numeric(df["tx_brate_dl_mbps"], errors="coerce") elif "enb_dl_brate" in df.columns: df["traffic_load"] = pd.to_numeric(df["enb_dl_brate"], errors="coerce") diff --git a/scripts/train.py b/scripts/train.py index 0988160..2b2ebc4 100644 --- a/scripts/train.py +++ b/scripts/train.py @@ -58,9 +58,9 @@ def _load_splits(csv: Path) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, d return train_df, val_df, test_df, split_meta -def _prepare_tabular_preprocessor(features: list[str]) -> ColumnTransformer: - num_features = [c for c in features if c != "scheduling_policy"] - cat_features = [c for c in features if c == "scheduling_policy"] +def _prepare_tabular_preprocessor(features: list[str], train_df: pd.DataFrame) -> ColumnTransformer: + num_features = [c for c in features if pd.api.types.is_numeric_dtype(train_df[c])] + cat_features = [c for c in features if c not in num_features] return ColumnTransformer( transformers=[ ("num", StandardScaler(), num_features), @@ -88,6 +88,9 @@ def main() -> None: train_df, val_df, test_df, split_meta = _load_splits(csv) candidate_features = [c for c in FEATURE_ORDER if c in train_df.columns] + if not candidate_features: + skip = {"target", "time_ms", "timestamp", "reservation"} + candidate_features = [c for c in train_df.columns if c not in skip] importance_df = rank_features_by_importance(train_df, candidate_features, random_state=args.seed) feature_count = args.feature_count if args.feature_count is not None else len(candidate_features) features = select_top_k_features(importance_df, min(feature_count, len(candidate_features))) @@ -201,7 +204,7 @@ def encode(df: pd.DataFrame) -> pd.DataFrame: (out_dir / "metrics.json").write_text(json.dumps(final_metrics, indent=2), encoding="utf-8") pd.DataFrame(epoch_rows).to_csv(out_dir / "epoch_metrics.csv", index=False) else: - pre = _prepare_tabular_preprocessor(features) + pre = _prepare_tabular_preprocessor(features, train_df) model = build_model(args.model, args.seed) pipe = Pipeline([("pre", pre), ("model", model)]) x_train, y_train = train_df[features], train_df["target"].to_numpy() diff --git a/tests/test_madrid_zone_dataset.py b/tests/test_madrid_zone_dataset.py new file mode 100644 index 0000000..7f73900 --- /dev/null +++ b/tests/test_madrid_zone_dataset.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import subprocess +import sys +from pathlib import Path + +import pandas as pd + +from oran_sim.data import load_timeseries_from_kpm + + +ZONE_I = Path("dataset/madrid-lte-dataset/zoneI") + + +def test_load_timeseries_from_madrid_zone_i() -> None: + df = load_timeseries_from_kpm(ZONE_I, n_steps=64, verbose=False) + assert len(df) == 64 + assert "traffic_load" in df.columns + assert "num_ues" in df.columns + assert set(df["reservation"].unique()).issubset({"f796", "f1815", "f2650"}) + assert df["traffic_load"].astype(float).sum() > 0 + assert df["num_ues"].astype(float).max() > 0 + + +def test_generate_data_with_madrid_zone_i(tmp_path: Path) -> None: + out_csv = tmp_path / "traffic_data.csv" + subprocess.run( + [ + sys.executable, + "generate_data.py", + "--steps", + "100", + "--input", + str(ZONE_I), + "--output", + str(out_csv), + "--seed", + "42", + ], + check=True, + ) + + df = pd.read_csv(out_csv) + assert len(df) == 100 + assert {"traffic_load", "num_ues", "target"}.issubset(df.columns) + assert {"downlink_f796", "uplink_f796", "users_f796"}.issubset(df.columns) + assert {"downlink_f1815", "uplink_f1815", "users_f1815"}.issubset(df.columns) + assert {"downlink_f2650", "uplink_f2650", "users_f2650"}.issubset(df.columns) + assert df["traffic_load"].sum() > 0 + + +def test_train_pipeline_with_madrid_generated_data(tmp_path: Path) -> None: + csv_path = tmp_path / "traffic_data.csv" + out_dir = tmp_path / "model" + subprocess.run( + [ + sys.executable, + "generate_data.py", + "--steps", + "200", + "--input", + str(ZONE_I), + "--output", + str(csv_path), + ], + check=True, + ) + + subprocess.run( + [ + sys.executable, + "-m", + "scripts.train", + "--csv", + str(csv_path), + "--out_dir", + str(out_dir), + "--model", + "lightweight-32", + "--epochs", + "1", + ], + check=True, + ) + assert (out_dir / "model.joblib").exists() + assert (out_dir / "features.json").exists()