Skip to main content

powerio_matrix/io/
gridfm.rs

1//! GridFM interchange: a parsed case as the gridfm-datakit Parquet schema.
2//!
3//! [`gridfm-datakit`](https://github.com/gridfm) writes per-scenario Parquet
4//! tables that [`gridfm-graphkit`](https://github.com/gridfm)'s
5//! `HeteroGridDatasetDisk` trains a GNN on. This module emits the same four
6//! tables — `bus_data`, `gen_data`, `branch_data`, `y_bus_data` — from one
7//! parsed [`Network`], so graphkit can train on powerio output directly and the
8//! scenario-batch path (issue #14) has its on-disk format.
9//!
10//! The reverse — [`read_gridfm_dataset`] / [`read_gridfm_scenarios`] /
11//! [`gridfm_base_case`], with the pure [`read_gridfm_network`] over in-memory
12//! batches — rebuilds a [`Network`] from such a dataset (lossy but
13//! power-flow-complete; see [`GridfmRead`]), the ML→classical return leg
14//! (issue #60). One reader plus the existing writers means gridfm → any classical
15//! format. `y_bus_data` is ignored on read; branches carry raw `r/x/b`.
16//!
17//! # Snapshots and scenarios
18//!
19//! powerio has no power flow solver. One parsed case is one snapshot
20//! (`scenario = 0`): voltages and generator dispatch are the case's stored
21//! values, and branch flows `pf/qf/pt/qt` are computed from those voltages and
22//! the branch admittances (`branch_flows`). For a solved MATPOWER case the
23//! stored voltages are the converged operating point, so the flows match what a
24//! solver would report to float tolerance; for an unsolved/flat start case they
25//! are the flows at the stored voltages, not a re-solved dispatch.
26//!
27//! A scenario batch ([`write_gridfm_batch`] / [`gridfm_record_batches_batch`])
28//! row-stacks many snapshots into the four tables, keyed by the `scenario`
29//! column. The snapshots share a base element set — the same bus/branch/gen
30//! counts and bus-id ordering, so the dense bus index means the same bus across
31//! scenarios — enforced by the shape check ([`Error::ScenarioShapeMismatch`]).
32//! Within that, load, dispatch, voltages, branch status, bus type, and costs may
33//! all differ per snapshot. This matches datakit, whose topology variants (N-K,
34//! random component drop) toggle `BR_STATUS`/`GEN_STATUS` on a fixed element set,
35//! and graphkit's `HeteroGridDatasetDisk`, which groups by `scenario` and
36//! rebuilds the graph independently for each one. powerio doesn't generate the
37//! perturbations; a caller (e.g. a scenario generator) supplies the snapshots.
38//!
39//! # Units
40//!
41//! - `Pd, Qd, Pg, Qg, p_mw, q_mvar` are MW/MVAr, passed through from the case
42//!   (loads and generator setpoints are already MW/MVAr). The branch flows
43//!   `pf, qf, pt, qt` are MW/MVAr too, computed in per-unit and scaled by
44//!   `base_mva`.
45//! - `Vm` per-unit, `Va` degrees; `r, x, b` and the `Y**` admittances per-unit.
46//! - `GS, BS` are the MATPOWER shunt values (MW/MVAr at V = 1) divided by
47//!   `base_mva`, matching datakit's normalization.
48//! - Costs are the raw MATPOWER coefficients: `cp2 = c2`, `cp1 = c1`,
49//!   `cp0 = c0`. A cost row gridfm can't represent (piecewise, missing,
50//!   malformed, or cubic and higher) emits zeros — graphkit ignores the cost
51//!   columns — and is counted in the manifest. The `_eur` suffixes are
52//!   datakit's column names, not a unit powerio converts to.
53//! - `bus`, `from_bus`, `to_bus` are dense `[0, n)` indices; `idx` is the
54//!   0-based generator/branch row. An out-of-service branch keeps its physical
55//!   `Y**` admittances but carries zero flows (its `br_status` is 0).
56
57// Bus/branch indices and Y_bus nnz counts cast to `i64` for the Arrow columns;
58// they are bounded far below `i64::MAX`, so the wrap clippy warns about can't
59// happen.
60#![allow(clippy::cast_possible_wrap)]
61
62use std::path::{Path, PathBuf};
63use std::sync::Arc;
64
65use arrow::array::{Array, ArrayRef, Float64Array, Int64Array};
66use arrow::datatypes::{Field, Schema};
67use arrow::record_batch::RecordBatch;
68use num_complex::Complex64;
69use parquet::arrow::ArrowWriter;
70use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
71use parquet::basic::Compression;
72use parquet::file::properties::WriterProperties;
73use serde::Serialize;
74
75use crate::indexed::IndexedNetwork;
76use crate::matrix::{BuildOptions, YbusFlags, branch_admittance, branch_flows, build_ybus};
77use crate::network::{Branch, Bus, BusId, BusType, Extras, Generator, Load, Shunt, SourceFormat};
78use crate::{ElementCounts, Error, GenCost, Network, Result, ScenarioMismatch};
79
80/// Options for the gridfm export — the batch-wide knobs. The scenario id is a
81/// per-snapshot property (set via [`GridfmSnapshot::new`] / [`numbered_snapshots`],
82/// or the explicit argument to the single-case [`write_gridfm_dataset`] /
83/// [`gridfm_record_batches`]), not an option here.
84#[derive(Debug, Clone)]
85pub struct GridfmOptions {
86    /// Also write `y_bus_data.parquet`. graphkit reconstructs admittances from
87    /// the branch table and ignores it, but datakit emits it, so the default is
88    /// `true` for parity.
89    pub include_y_bus: bool,
90    /// Apply transformer tap ratios to the admittances. Default `true` (the
91    /// physical admittances datakit stores).
92    pub include_taps: bool,
93    /// Apply phase shifts to the admittances. Default `true`.
94    pub include_shifts: bool,
95}
96
97impl Default for GridfmOptions {
98    fn default() -> Self {
99        Self {
100            include_y_bus: true,
101            include_taps: true,
102            include_shifts: true,
103        }
104    }
105}
106
107impl GridfmOptions {
108    /// The Y_bus build flags these options select (only taps and shifts matter
109    /// for the gridfm admittances; the B'/B'' scheme and zero-impedance policy
110    /// don't apply).
111    fn build_options(&self) -> BuildOptions {
112        BuildOptions {
113            include_taps: self.include_taps,
114            include_shifts: self.include_shifts,
115            ..Default::default()
116        }
117    }
118}
119
120/// One snapshot in a gridfm scenario batch: a parsed [`Network`] and the scenario
121/// id stamped into its rows.
122///
123/// powerio has no solver, so each snapshot is an operating point a caller (e.g. a
124/// scenario generator) has already produced. Snapshots in one batch share a base
125/// element set — the same bus/branch/gen counts and bus-id ordering — so the
126/// dense bus index means the same bus across snapshots and the tables stay
127/// schema-consistent. The builders enforce that and otherwise return
128/// [`Error::ScenarioShapeMismatch`]. Within that, load, dispatch, voltages,
129/// branch status, bus type, and costs may all vary per snapshot — this mirrors
130/// gridfm-datakit, whose topology variants (N-K, random component drop) toggle
131/// `BR_STATUS`/`GEN_STATUS` on a fixed element set, and gridfm-graphkit, which
132/// rebuilds the graph independently for every scenario.
133#[derive(Debug, Clone, Copy)]
134pub struct GridfmSnapshot<'a> {
135    /// The parsed case for this scenario.
136    net: &'a Network,
137    /// The scenario id stamped into the `scenario`/`load_scenario_idx` columns.
138    scenario: i64,
139}
140
141impl<'a> GridfmSnapshot<'a> {
142    /// A snapshot pairing a network with the scenario id stamped into its rows.
143    /// For the common "k-th input is `base + k`" numbering, prefer
144    /// [`numbered_snapshots`], which assigns ids with checked arithmetic.
145    #[must_use]
146    pub fn new(net: &'a Network, scenario: i64) -> Self {
147        Self { net, scenario }
148    }
149}
150
151/// The gridfm-datakit tables as Arrow record batches. The Parquet writer builds
152/// from these; a deferred gridfm-schema Arrow C Data Interface export (issue #38)
153/// would reuse them. (The raw network Arrow export that ships in powerio-capi is
154/// a different, lighter schema.)
155///
156/// For a scenario batch the tables are row-stacked: each table holds the rows of
157/// every snapshot back-to-back, keyed by the `scenario` column (0-based dense bus
158/// indices and generator/branch `idx` reset per scenario).
159#[derive(Debug, Clone)]
160#[non_exhaustive]
161pub struct GridfmTables {
162    pub bus: RecordBatch,
163    pub generator: RecordBatch,
164    pub branch: RecordBatch,
165    /// `None` when [`GridfmOptions::include_y_bus`] is off — the table isn't
166    /// built (graphkit reconstructs admittances from the branch table anyway).
167    pub y_bus: Option<RecordBatch>,
168}
169
170/// What [`write_gridfm_dataset`] wrote, plus the counts of columns it had to zero
171/// (see the manifest) so a caller can surface them.
172#[derive(Debug, Clone)]
173pub struct GridfmOutputs {
174    pub dir: PathBuf,
175    pub files: Vec<PathBuf>,
176    /// Branches with `r² + x² = 0`, whose admittance/flow columns were zeroed.
177    pub dropped_zero_impedance: usize,
178    /// Generators whose cost row gridfm couldn't represent, whose `cp*` columns
179    /// were zeroed.
180    pub degenerate_cost_gens: usize,
181}
182
183#[derive(Serialize)]
184struct GridfmMeta {
185    case_name: String,
186    base_mva: f64,
187    /// The first snapshot's scenario id (the base for a batch).
188    scenario: i64,
189    /// Number of stacked scenarios (1 for a single case).
190    n_scenarios: usize,
191    schema: &'static str,
192    /// Shared base element set (equal across all snapshots by the shape check).
193    n_buses: usize,
194    n_branches: usize,
195    /// In-service branch count of the **first** snapshot; branch status may
196    /// differ per scenario, so this describes scenario 0, not the whole batch.
197    n_branches_in_service: usize,
198    n_gens: usize,
199    /// Reference (slack) bus of the **first** snapshot. Each snapshot resolves
200    /// its own reference and carries it in the bus `REF` / gen `is_slack_gen`
201    /// columns; this records scenario 0's.
202    reference_bus: usize,
203    /// Branches with `r² + x² = 0` (admittance/flow columns zeroed), summed over
204    /// every snapshot in the batch.
205    dropped_zero_impedance: usize,
206    /// Generators whose cost row gridfm can't represent (piecewise, missing,
207    /// malformed, or cubic and higher; `cp*` columns zeroed), summed over every
208    /// snapshot in the batch.
209    degenerate_cost_gens: usize,
210    files: Vec<String>,
211    powerio_version: String,
212}
213
214/// Build the four gridfm tables for one network, stamping `scenario` into the id
215/// columns. Pure (no I/O). A thin wrapper over [`gridfm_record_batches_batch`]
216/// for one snapshot.
217///
218/// # Errors
219/// [`Error::ReferenceBusCount`] unless the case has exactly one reference bus
220/// (graphkit needs a slack), [`Error::NormalizedGridfmSnapshot`] for a normalized
221/// input, [`Error::NonFiniteGridfmValue`] for a NaN/Inf physical quantity (or a
222/// NaN limit — `±Inf` limits are the valid unbounded sentinel) that would reach
223/// Parquet, [`Error::NonFiniteSusceptance`] if a finite branch impedance still
224/// yields a non-finite admittance, and [`Error::UnknownBus`] if a generator or
225/// branch references a bus the network doesn't define.
226pub fn gridfm_record_batches(
227    net: &Network,
228    scenario: i64,
229    opts: &GridfmOptions,
230) -> Result<GridfmTables> {
231    let snap = GridfmSnapshot::new(net, scenario);
232    gridfm_record_batches_batch(std::slice::from_ref(&snap), opts)
233}
234
235/// Build the four gridfm tables for a batch of scenarios, row-stacked and keyed
236/// by the `scenario` column. Pure (no I/O). Each snapshot carries its own
237/// scenario id; the `include_y_bus`/taps/shifts flags apply to every snapshot.
238///
239/// # Errors
240/// [`Error::EmptyScenarioBatch`] for an empty batch,
241/// [`Error::ScenarioShapeMismatch`] if the snapshots don't share one base element
242/// set (counts + bus-id order), plus everything [`gridfm_record_batches`] can
243/// return.
244pub fn gridfm_record_batches_batch(
245    snapshots: &[GridfmSnapshot],
246    opts: &GridfmOptions,
247) -> Result<GridfmTables> {
248    let views = snapshot_views(snapshots)?;
249    tables_from_views(&views, opts)
250}
251
252/// The four tables from already-built, shape-checked snapshot views. The Y_bus
253/// table is built only when [`GridfmOptions::include_y_bus`] is set — otherwise
254/// it's `None` and the per-snapshot `build_ybus` is skipped entirely.
255fn tables_from_views(views: &[SnapshotView], opts: &GridfmOptions) -> Result<GridfmTables> {
256    Ok(GridfmTables {
257        bus: bus_batch(views)?,
258        generator: gen_batch(views)?,
259        branch: branch_batch(views, opts)?,
260        y_bus: if opts.include_y_bus {
261            Some(y_bus_batch(views, opts)?)
262        } else {
263            None
264        },
265    })
266}
267
268/// A resolved snapshot: its indexed view, scenario id, and reference bus.
269struct SnapshotView<'a> {
270    view: IndexedNetwork<'a>,
271    scenario: i64,
272    ref_bus: usize,
273}
274
275/// Build and shape-check the views for a scenario batch. Every snapshot must
276/// resolve to exactly one reference bus and share the first snapshot's base
277/// element set (bus / branch / generator counts and bus-id ordering), so the
278/// row-stacked tables stay schema-consistent.
279fn snapshot_views<'a>(snapshots: &'a [GridfmSnapshot<'a>]) -> Result<Vec<SnapshotView<'a>>> {
280    let first = snapshots.first().ok_or(Error::EmptyScenarioBatch)?;
281    let expected = shape_of(first.net);
282    let expected_ids: Vec<BusId> = first.net.buses.iter().map(|b| b.id).collect();
283
284    let mut views = Vec::with_capacity(snapshots.len());
285    for (k, snap) in snapshots.iter().enumerate() {
286        let got = shape_of(snap.net);
287        if got != expected {
288            return Err(Error::ScenarioShapeMismatch {
289                index: k,
290                reason: ScenarioMismatch::Counts { expected, got },
291            });
292        }
293        let ids_match = snap
294            .net
295            .buses
296            .iter()
297            .map(|b| b.id)
298            .eq(expected_ids.iter().copied());
299        if !ids_match {
300            return Err(Error::ScenarioShapeMismatch {
301                index: k,
302                reason: ScenarioMismatch::BusOrder,
303            });
304        }
305        validate_snapshot_inputs(snap.net, snap.scenario)?;
306        let view = IndexedNetwork::new(snap.net);
307        let ref_bus = view.reference_bus_index()?;
308        views.push(SnapshotView {
309            view,
310            scenario: snap.scenario,
311            ref_bus,
312        });
313    }
314    Ok(views)
315}
316
317fn validate_snapshot_inputs(net: &Network, scenario: i64) -> Result<()> {
318    if net.is_normalized() {
319        return Err(Error::NormalizedGridfmSnapshot { scenario });
320    }
321    net.check_base_mva()?;
322
323    // Two tiers: physical quantities must be finite, while limit fields admit
324    // `±Inf` as the unbounded sentinel (the PowerModels reader defaults absent
325    // gen limits to `±Inf`, MATPOWER files carry literal `Inf` tokens, and
326    // Parquet stores `±Inf` natively) — only `NaN` is corrupt there.
327    for (row, b) in net.buses.iter().enumerate() {
328        finite(scenario, "bus", row, "vm", b.vm)?;
329        finite(scenario, "bus", row, "va", b.va)?;
330        finite(scenario, "bus", row, "base_kv", b.base_kv)?;
331        not_nan(scenario, "bus", row, "vmax", b.vmax)?;
332        not_nan(scenario, "bus", row, "vmin", b.vmin)?;
333    }
334    for (row, l) in net.loads.iter().enumerate() {
335        finite(scenario, "load", row, "p", l.p)?;
336        finite(scenario, "load", row, "q", l.q)?;
337    }
338    for (row, s) in net.shunts.iter().enumerate() {
339        finite(scenario, "shunt", row, "g", s.g)?;
340        finite(scenario, "shunt", row, "b", s.b)?;
341    }
342    for (row, br) in net.branches.iter().enumerate() {
343        finite(scenario, "branch", row, "r", br.r)?;
344        finite(scenario, "branch", row, "x", br.x)?;
345        finite(scenario, "branch", row, "b", br.b)?;
346        finite(scenario, "branch", row, "tap", br.tap)?;
347        finite(scenario, "branch", row, "shift", br.shift)?;
348        not_nan(scenario, "branch", row, "angmin", br.angmin)?;
349        not_nan(scenario, "branch", row, "angmax", br.angmax)?;
350        not_nan(scenario, "branch", row, "rate_a", br.rate_a)?;
351    }
352    for (row, g) in net.generators.iter().enumerate() {
353        finite(scenario, "generator", row, "pg", g.pg)?;
354        finite(scenario, "generator", row, "qg", g.qg)?;
355        not_nan(scenario, "generator", row, "pmax", g.pmax)?;
356        not_nan(scenario, "generator", row, "pmin", g.pmin)?;
357        not_nan(scenario, "generator", row, "qmax", g.qmax)?;
358        not_nan(scenario, "generator", row, "qmin", g.qmin)?;
359        // Validate exactly what lands in the `cp0/cp1/cp2` columns, not the
360        // raw coefficient list (non-representable rows export as zeros).
361        let (cp0, cp1, cp2) = gridfm_cost(g.cost.as_ref());
362        finite(scenario, "gencost", row, "cp0", cp0)?;
363        finite(scenario, "gencost", row, "cp1", cp1)?;
364        finite(scenario, "gencost", row, "cp2", cp2)?;
365    }
366    Ok(())
367}
368
369fn finite(
370    scenario: i64,
371    element: &'static str,
372    row: usize,
373    field: &'static str,
374    value: f64,
375) -> Result<()> {
376    if value.is_finite() {
377        Ok(())
378    } else {
379        Err(Error::NonFiniteGridfmValue {
380            scenario,
381            element,
382            row,
383            field,
384            value,
385        })
386    }
387}
388
389/// Limit fields: `±Inf` is the valid unbounded sentinel, only `NaN` is corrupt.
390fn not_nan(
391    scenario: i64,
392    element: &'static str,
393    row: usize,
394    field: &'static str,
395    value: f64,
396) -> Result<()> {
397    if value.is_nan() {
398        Err(Error::NonFiniteGridfmValue {
399            scenario,
400            element,
401            row,
402            field,
403            value,
404        })
405    } else {
406        Ok(())
407    }
408}
409
410/// The base element shape a scenario batch shares.
411fn shape_of(net: &Network) -> ElementCounts {
412    ElementCounts {
413        buses: net.buses.len(),
414        branches: net.branches.len(),
415        gens: net.generators.len(),
416    }
417}
418
419/// Number a list of networks into snapshots, stamping the k-th `base + k` — the
420/// one place the "k-th input is scenario `base + k`" rule lives, so the CLI and
421/// the Python binding can't drift. Checked: returns [`Error::ScenarioIdOverflow`]
422/// rather than wrapping or panicking if a scenario id exceeds `i64`.
423pub fn numbered_snapshots<'a>(nets: &[&'a Network], base: i64) -> Result<Vec<GridfmSnapshot<'a>>> {
424    nets.iter()
425        .enumerate()
426        .map(|(k, &net)| {
427            let scenario = i64::try_from(k)
428                .ok()
429                .and_then(|offset| base.checked_add(offset))
430                .ok_or(Error::ScenarioIdOverflow { base, index: k })?;
431            Ok(GridfmSnapshot::new(net, scenario))
432        })
433        .collect()
434}
435
436/// Write the gridfm-datakit Parquet dataset for one case under
437/// `out_dir/<network_name>/raw/`, matching datakit's directory layout. Stamps
438/// `scenario` into the id columns. Writes `bus_data.parquet`, `gen_data.parquet`,
439/// `branch_data.parquet`, optionally `y_bus_data.parquet`, and a
440/// `gridfm_meta.json` manifest.
441///
442/// Expects a raw snapshot (powers in MW, angles in degrees); pass the parsed
443/// `Network`, not a [`to_normalized`](powerio::Network::to_normalized) per-unit
444/// product, whose fields would be mislabeled.
445///
446/// # Errors
447/// Propagates [`gridfm_record_batches`] and any filesystem/Parquet error.
448pub fn write_gridfm_dataset(
449    net: &Network,
450    scenario: i64,
451    out_dir: impl AsRef<Path>,
452    opts: &GridfmOptions,
453) -> Result<GridfmOutputs> {
454    let snap = GridfmSnapshot::new(net, scenario);
455    write_gridfm_batch(std::slice::from_ref(&snap), out_dir, opts)
456}
457
458/// Write a batch of scenarios as one gridfm-datakit dataset under
459/// `out_dir/<network_name>/raw/`, row-stacking every snapshot's tables and keying
460/// them by the `scenario` column. The dataset name and the base element counts
461/// come from the first snapshot (shared across the batch by the shape check); the
462/// dropped/degenerate counts are summed over every snapshot, while `reference_bus`
463/// / `n_branches_in_service` record the first snapshot only (they can differ per
464/// scenario, so the manifest documents them as scenario 0's).
465///
466/// # Errors
467/// Propagates [`gridfm_record_batches_batch`] and any filesystem/Parquet error.
468pub fn write_gridfm_batch(
469    snapshots: &[GridfmSnapshot],
470    out_dir: impl AsRef<Path>,
471    opts: &GridfmOptions,
472) -> Result<GridfmOutputs> {
473    let views = snapshot_views(snapshots)?;
474    let tables = tables_from_views(&views, opts)?;
475
476    // The shape check guarantees every snapshot shares the base element set, so
477    // the name and structural counts come from the first.
478    let net = views[0].view.network();
479    let dir = out_dir.as_ref().join(&net.name).join("raw");
480    std::fs::create_dir_all(&dir)?;
481
482    let mut files = Vec::new();
483    put_parquet(&dir, "bus_data.parquet", &tables.bus, &mut files)?;
484    put_parquet(&dir, "gen_data.parquet", &tables.generator, &mut files)?;
485    put_parquet(&dir, "branch_data.parquet", &tables.branch, &mut files)?;
486    if let Some(y_bus) = &tables.y_bus {
487        put_parquet(&dir, "y_bus_data.parquet", y_bus, &mut files)?;
488    }
489
490    // Branch status and costs may differ per scenario, so count the zeroed rows
491    // across every snapshot — the totals describe the whole stacked dataset.
492    let dropped_zero_impedance: usize = views
493        .iter()
494        .flat_map(|v| v.view.network().branches.iter())
495        .filter(|br| br.r * br.r + br.x * br.x == 0.0)
496        .count();
497    let degenerate_cost_gens: usize = views
498        .iter()
499        .flat_map(|v| v.view.network().generators.iter())
500        .filter(|g| !cost_representable(g.cost.as_ref()))
501        .count();
502
503    let meta = GridfmMeta {
504        case_name: net.name.clone(),
505        base_mva: net.base_mva,
506        scenario: views[0].scenario,
507        n_scenarios: views.len(),
508        schema: "gridfm-datakit",
509        n_buses: net.buses.len(),
510        n_branches: net.branches.len(),
511        n_branches_in_service: net.branches.iter().filter(|b| b.in_service).count(),
512        n_gens: net.generators.len(),
513        reference_bus: views[0].ref_bus,
514        dropped_zero_impedance,
515        degenerate_cost_gens,
516        files: files
517            .iter()
518            .filter_map(|p| p.file_name().and_then(|s| s.to_str()).map(str::to_string))
519            .collect(),
520        powerio_version: env!("CARGO_PKG_VERSION").to_string(),
521    };
522    let meta_path = dir.join("gridfm_meta.json");
523    let json = serde_json::to_string_pretty(&meta).map_err(|e| Error::Parquet(e.to_string()))?;
524    std::fs::write(&meta_path, json)?;
525    files.push(meta_path);
526
527    Ok(GridfmOutputs {
528        dir,
529        files,
530        dropped_zero_impedance,
531        degenerate_cost_gens,
532    })
533}
534
535// --- table builders --------------------------------------------------------
536
537fn bus_batch(snaps: &[SnapshotView]) -> Result<RecordBatch> {
538    let total: usize = snaps.iter().map(|s| s.view.n()).sum();
539    let mut scenario = Vec::with_capacity(total);
540    let mut bus_idx = Vec::with_capacity(total);
541    let (mut pd, mut qd) = (Vec::with_capacity(total), Vec::with_capacity(total));
542    let (mut pg_col, mut qg_col) = (Vec::with_capacity(total), Vec::with_capacity(total));
543    let (mut vm, mut va) = (Vec::with_capacity(total), Vec::with_capacity(total));
544    let (mut pq, mut pv, mut refc) = (
545        Vec::with_capacity(total),
546        Vec::with_capacity(total),
547        Vec::with_capacity(total),
548    );
549    let mut vn_kv = Vec::with_capacity(total);
550    let (mut min_vm, mut max_vm) = (Vec::with_capacity(total), Vec::with_capacity(total));
551    let (mut gs, mut bs) = (Vec::with_capacity(total), Vec::with_capacity(total));
552
553    for s in snaps {
554        let view = &s.view;
555        let n = view.n();
556        let base = view.base_mva();
557        let buses = &view.network().buses;
558
559        // Per-bus generation, summed over in-service generators (dense order).
560        let mut pg = vec![0.0; n];
561        let mut qg = vec![0.0; n];
562        for (_, g) in view.in_service_gens() {
563            if let Some(i) = view.bus_index(g.bus) {
564                pg[i] += g.pg;
565                qg[i] += g.qg;
566            }
567        }
568
569        scenario.resize(scenario.len() + n, s.scenario);
570        bus_idx.extend(0..n as i64);
571        pd.extend_from_slice(view.pd());
572        qd.extend_from_slice(view.qd());
573        pg_col.extend(pg);
574        qg_col.extend(qg);
575        vm.extend(buses.iter().map(|b| b.vm));
576        va.extend(buses.iter().map(|b| b.va));
577        pq.extend(buses.iter().map(|b| i64::from(b.kind == BusType::Pq)));
578        pv.extend(buses.iter().map(|b| i64::from(b.kind == BusType::Pv)));
579        refc.extend(buses.iter().map(|b| i64::from(b.kind == BusType::Ref)));
580        vn_kv.extend(buses.iter().map(|b| b.base_kv));
581        min_vm.extend(buses.iter().map(|b| b.vmin));
582        max_vm.extend(buses.iter().map(|b| b.vmax));
583        gs.extend(view.gs().iter().map(|g| g / base));
584        bs.extend(view.bs().iter().map(|b| b / base));
585    }
586
587    batch(with_scenario_pair(
588        scenario,
589        vec![
590            ("bus", i64s(bus_idx)),
591            ("Pd", f64s(pd)),
592            ("Qd", f64s(qd)),
593            ("Pg", f64s(pg_col)),
594            ("Qg", f64s(qg_col)),
595            ("Vm", f64s(vm)),
596            ("Va", f64s(va)),
597            ("PQ", i64s(pq)),
598            ("PV", i64s(pv)),
599            ("REF", i64s(refc)),
600            ("vn_kv", f64s(vn_kv)),
601            ("min_vm_pu", f64s(min_vm)),
602            ("max_vm_pu", f64s(max_vm)),
603            ("GS", f64s(gs)),
604            ("BS", f64s(bs)),
605        ],
606    ))
607}
608
609fn gen_batch(snaps: &[SnapshotView]) -> Result<RecordBatch> {
610    let total: usize = snaps.iter().map(|s| s.view.generators().len()).sum();
611    let mut scenario = Vec::with_capacity(total);
612    let mut idx = Vec::with_capacity(total);
613    let mut bus = Vec::with_capacity(total);
614    let (mut p_mw, mut q_mvar) = (Vec::with_capacity(total), Vec::with_capacity(total));
615    let (mut min_p, mut max_p) = (Vec::with_capacity(total), Vec::with_capacity(total));
616    let (mut min_q, mut max_q) = (Vec::with_capacity(total), Vec::with_capacity(total));
617    let (mut cp0, mut cp1, mut cp2) = (
618        Vec::with_capacity(total),
619        Vec::with_capacity(total),
620        Vec::with_capacity(total),
621    );
622    let mut in_service = Vec::with_capacity(total);
623    let mut is_slack = Vec::with_capacity(total);
624
625    for s in snaps {
626        let view = &s.view;
627        // One pass over the snapshot's generators: every column gets one push per
628        // generator, in dense source order.
629        for (row, g) in view.generators().iter().enumerate() {
630            let i = view.bus_index(g.bus).ok_or(Error::UnknownBus {
631                bus_id: g.bus,
632                element_index: row,
633            })?;
634            scenario.push(s.scenario);
635            idx.push(row as i64);
636            bus.push(i as i64);
637            is_slack.push(i64::from(i == s.ref_bus));
638            let (c0, c1, c2) = gridfm_cost(g.cost.as_ref());
639            cp0.push(c0);
640            cp1.push(c1);
641            cp2.push(c2);
642            p_mw.push(g.pg);
643            q_mvar.push(g.qg);
644            min_p.push(g.pmin);
645            max_p.push(g.pmax);
646            min_q.push(g.qmin);
647            max_q.push(g.qmax);
648            in_service.push(i64::from(g.in_service));
649        }
650    }
651
652    batch(with_scenario_pair(
653        scenario,
654        vec![
655            ("idx", i64s(idx)),
656            ("bus", i64s(bus)),
657            ("p_mw", f64s(p_mw)),
658            ("q_mvar", f64s(q_mvar)),
659            ("min_p_mw", f64s(min_p)),
660            ("max_p_mw", f64s(max_p)),
661            ("min_q_mvar", f64s(min_q)),
662            ("max_q_mvar", f64s(max_q)),
663            ("cp0_eur", f64s(cp0)),
664            ("cp1_eur_per_mw", f64s(cp1)),
665            ("cp2_eur_per_mw2", f64s(cp2)),
666            ("in_service", i64s(in_service)),
667            ("is_slack_gen", i64s(is_slack)),
668        ],
669    ))
670}
671
672#[allow(clippy::too_many_lines, clippy::many_single_char_names)]
673fn branch_batch(snaps: &[SnapshotView], opts: &GridfmOptions) -> Result<RecordBatch> {
674    let total: usize = snaps.iter().map(|s| s.view.branches().len()).sum();
675
676    // Same flags the Y_bus builder derives, so the branch admittance columns and
677    // y_bus_data come from one kernel. The taps/shifts flags are batch-wide (from
678    // `opts`); the admittances themselves are recomputed per snapshot.
679    let flags = YbusFlags {
680        unity_taps: !opts.include_taps,
681        zero_shifts: !opts.include_shifts,
682        ..Default::default()
683    };
684
685    let mut scenario = Vec::with_capacity(total);
686    let mut idx = Vec::with_capacity(total);
687    let (mut from_bus, mut to_bus) = (Vec::with_capacity(total), Vec::with_capacity(total));
688    let (mut pf, mut qf, mut pt, mut qt) = (
689        Vec::with_capacity(total),
690        Vec::with_capacity(total),
691        Vec::with_capacity(total),
692        Vec::with_capacity(total),
693    );
694    let (mut yff_r, mut yff_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
695    let (mut yft_r, mut yft_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
696    let (mut ytf_r, mut ytf_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
697    let (mut ytt_r, mut ytt_i) = (Vec::with_capacity(total), Vec::with_capacity(total));
698    let (mut r_col, mut x_col, mut b_col) = (
699        Vec::with_capacity(total),
700        Vec::with_capacity(total),
701        Vec::with_capacity(total),
702    );
703    let (mut tap, mut shift) = (Vec::with_capacity(total), Vec::with_capacity(total));
704    let (mut ang_min, mut ang_max) = (Vec::with_capacity(total), Vec::with_capacity(total));
705    let mut rate_a = Vec::with_capacity(total);
706    let mut br_status = Vec::with_capacity(total);
707
708    for s in snaps {
709        let view = &s.view;
710        let base = view.base_mva();
711        let branches = view.branches();
712        let buses = &view.network().buses;
713        // Complex bus voltages `vm·e^{jθ}`, dense order, for the flow evaluation.
714        let v: Vec<Complex64> = buses
715            .iter()
716            .map(|b| Complex64::from_polar(b.vm, b.va.to_radians()))
717            .collect();
718
719        scenario.resize(scenario.len() + branches.len(), s.scenario);
720        idx.extend(0..branches.len() as i64);
721
722        for (row, br) in branches.iter().enumerate() {
723            let i = view.bus_index(br.from).ok_or(Error::UnknownBus {
724                bus_id: br.from,
725                element_index: row,
726            })?;
727            let j = view.bus_index(br.to).ok_or(Error::UnknownBus {
728                bus_id: br.to,
729                element_index: row,
730            })?;
731            from_bus.push(i as i64);
732            to_bus.push(j as i64);
733
734            // Zero-impedance branch → `None` → zeroed admittance/flow columns (never NaN).
735            let shift_rad = if flags.zero_shifts {
736                0.0
737            } else {
738                view.angle_radians(br.shift)
739            };
740            let block = branch_admittance(br, flags, shift_rad, row)?;
741            let [y_ff, y_ft, y_tf, y_tt] = block.unwrap_or([Complex64::new(0.0, 0.0); 4]);
742            yff_r.push(y_ff.re);
743            yff_i.push(y_ff.im);
744            yft_r.push(y_ft.re);
745            yft_i.push(y_ft.im);
746            ytf_r.push(y_tf.re);
747            ytf_i.push(y_tf.im);
748            ytt_r.push(y_tt.re);
749            ytt_i.push(y_tt.im);
750
751            let (sf, st) = if br.in_service && block.is_some() {
752                branch_flows(&[y_ff, y_ft, y_tf, y_tt], v[i], v[j])
753            } else {
754                (Complex64::new(0.0, 0.0), Complex64::new(0.0, 0.0))
755            };
756            pf.push(sf.re * base);
757            qf.push(sf.im * base);
758            pt.push(st.re * base);
759            qt.push(st.im * base);
760
761            r_col.push(br.r);
762            x_col.push(br.x);
763            b_col.push(br.b);
764            tap.push(br.effective_tap());
765            shift.push(br.shift);
766            ang_min.push(br.angmin);
767            ang_max.push(br.angmax);
768            rate_a.push(br.rate_a);
769            br_status.push(i64::from(br.in_service));
770        }
771    }
772
773    batch(with_scenario_pair(
774        scenario,
775        vec![
776            ("idx", i64s(idx)),
777            ("from_bus", i64s(from_bus)),
778            ("to_bus", i64s(to_bus)),
779            ("pf", f64s(pf)),
780            ("qf", f64s(qf)),
781            ("pt", f64s(pt)),
782            ("qt", f64s(qt)),
783            ("r", f64s(r_col)),
784            ("x", f64s(x_col)),
785            ("b", f64s(b_col)),
786            ("Yff_r", f64s(yff_r)),
787            ("Yff_i", f64s(yff_i)),
788            ("Yft_r", f64s(yft_r)),
789            ("Yft_i", f64s(yft_i)),
790            ("Ytf_r", f64s(ytf_r)),
791            ("Ytf_i", f64s(ytf_i)),
792            ("Ytt_r", f64s(ytt_r)),
793            ("Ytt_i", f64s(ytt_i)),
794            ("tap", f64s(tap)),
795            ("shift", f64s(shift)),
796            ("ang_min", f64s(ang_min)),
797            ("ang_max", f64s(ang_max)),
798            ("rate_a", f64s(rate_a)),
799            ("br_status", i64s(br_status)),
800        ],
801    ))
802}
803
804fn y_bus_batch(snaps: &[SnapshotView], opts: &GridfmOptions) -> Result<RecordBatch> {
805    // Upper bound on stacked nnz: each snapshot's Y_bus has at most 4 entries per
806    // branch plus a diagonal. The exact count varies (lossless branches, dropped
807    // zeros, per-scenario branch status), so this only sizes the allocation.
808    let est: usize = snaps
809        .iter()
810        .map(|s| 4 * s.view.branches().len() + s.view.n())
811        .sum();
812    let mut scenario = Vec::with_capacity(est);
813    let mut index1 = Vec::with_capacity(est);
814    let mut index2 = Vec::with_capacity(est);
815    let mut g_vals = Vec::with_capacity(est);
816    let mut b_vals = Vec::with_capacity(est);
817
818    for s in snaps {
819        let parts = build_ybus(&s.view, &opts.build_options())?;
820        // G and B don't share a sparsity pattern: a lossless branch (r = 0) is a
821        // pure reactance, so its G entries are structurally zero where B's aren't.
822        // datakit keys y_bus rows on the complex value being nonzero, i.e. the
823        // union of the G and B positions. Merge into a sorted (row, col) map so the
824        // output is row-major like `np.nonzero`, then drop any all-zero position.
825        let mut entries: std::collections::BTreeMap<(usize, usize), (f64, f64)> =
826            std::collections::BTreeMap::new();
827        for (row, g_row) in parts.g.outer_iterator().enumerate() {
828            for (col, &gv) in g_row.iter() {
829                entries.entry((row, col)).or_default().0 = gv;
830            }
831        }
832        for (row, b_row) in parts.b.outer_iterator().enumerate() {
833            for (col, &bv) in b_row.iter() {
834                entries.entry((row, col)).or_default().1 = bv;
835            }
836        }
837
838        for ((row, col), (gv, bv)) in entries {
839            if gv == 0.0 && bv == 0.0 {
840                continue;
841            }
842            scenario.push(s.scenario);
843            index1.push(row as i64);
844            index2.push(col as i64);
845            g_vals.push(gv);
846            b_vals.push(bv);
847        }
848    }
849
850    batch(with_scenario_pair(
851        scenario,
852        vec![
853            ("index1", i64s(index1)),
854            ("index2", i64s(index2)),
855            ("G", f64s(g_vals)),
856            ("B", f64s(b_vals)),
857        ],
858    ))
859}
860
861// --- small helpers ---------------------------------------------------------
862
863/// `(cp0, cp1, cp2)` = raw MATPOWER `(c0, c1, c2)` from a polynomial cost row.
864/// Coefficients are highest-order first, so `ncost == 3` is `[c2, c1, c0]`.
865/// Piecewise, missing, or malformed rows give zeros.
866fn gridfm_cost(cost: Option<&GenCost>) -> (f64, f64, f64) {
867    match cost {
868        Some(c) if c.model == 2 && c.coeffs.len() >= c.ncost => match c.ncost {
869            3 => (c.coeffs[2], c.coeffs[1], c.coeffs[0]),
870            2 => (c.coeffs[1], c.coeffs[0], 0.0),
871            1 => (c.coeffs[0], 0.0, 0.0),
872            _ => (0.0, 0.0, 0.0),
873        },
874        _ => (0.0, 0.0, 0.0),
875    }
876}
877
878/// Whether [`gridfm_cost`] can represent this cost row (used for the manifest's
879/// degenerate-cost count).
880fn cost_representable(cost: Option<&GenCost>) -> bool {
881    matches!(cost, Some(c) if c.model == 2 && c.coeffs.len() >= c.ncost && (1..=3).contains(&c.ncost))
882}
883
884fn put_parquet(
885    dir: &Path,
886    name: &str,
887    batch: &RecordBatch,
888    files: &mut Vec<PathBuf>,
889) -> Result<()> {
890    let path = dir.join(name);
891    let file = std::fs::File::create(&path)?;
892    let props = WriterProperties::builder()
893        .set_compression(Compression::SNAPPY)
894        .build();
895    let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props))
896        .map_err(|e| Error::Parquet(e.to_string()))?;
897    writer
898        .write(batch)
899        .map_err(|e| Error::Parquet(e.to_string()))?;
900    writer.close().map_err(|e| Error::Parquet(e.to_string()))?;
901    files.push(path);
902    Ok(())
903}
904
905/// Assemble a [`RecordBatch`] from named columns, in order; field types are read
906/// off the arrays and all columns are non-null.
907fn batch(columns: Vec<(&str, ArrayRef)>) -> Result<RecordBatch> {
908    let fields: Vec<Field> = columns
909        .iter()
910        .map(|(name, arr)| Field::new(*name, arr.data_type().clone(), false))
911        .collect();
912    let arrays: Vec<ArrayRef> = columns.into_iter().map(|(_, arr)| arr).collect();
913    RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays)
914        .map_err(|e| Error::Parquet(e.to_string()))
915}
916
917fn i64s(v: Vec<i64>) -> ArrayRef {
918    Arc::new(Int64Array::from(v))
919}
920
921fn f64s(v: Vec<f64>) -> ArrayRef {
922    Arc::new(Float64Array::from(v))
923}
924
925/// Prepend the `scenario` / `load_scenario_idx` id columns (which hold identical
926/// values) to a table's other columns. The Int64 array is built once and the Arc
927/// shared, so the duplicate column costs a pointer, not a second `Vec`.
928fn with_scenario_pair(
929    scenario: Vec<i64>,
930    rest: Vec<(&'static str, ArrayRef)>,
931) -> Vec<(&'static str, ArrayRef)> {
932    let scenario = i64s(scenario);
933    let mut cols = Vec::with_capacity(rest.len() + 2);
934    cols.push(("scenario", scenario.clone()));
935    cols.push(("load_scenario_idx", scenario));
936    cols.extend(rest);
937    cols
938}
939
940// === Reader: gridfm dataset → Network (issue #60) ==========================
941//
942// The inverse of the writer above: select one `scenario` out of the gridfm tables
943// and rebuild a `Network`. Lossy but power-flow-complete — original bus ids are
944// synthesized `1..n`, nodal load/shunt is folded to one synthetic element per
945// bus, and HVDC/storage/piecewise costs are gone (see [`GridfmRead::warnings`]).
946// `y_bus_data` is never read; branches carry raw `r/x/b`.
947
948/// One scenario read out of a gridfm dataset: the reconstructed [`Network`] plus
949/// the fidelity warnings the lossy read couldn't avoid (mirroring
950/// [`Conversion::warnings`](powerio::Conversion)).
951#[derive(Debug, Clone)]
952#[non_exhaustive]
953pub struct GridfmRead {
954    /// The reconstructed network (`source_format = SourceFormat::Gridfm`).
955    pub network: Network,
956    /// The scenario id these rows came from.
957    pub scenario: i64,
958    /// What the gridfm schema couldn't round-trip — synthesized bus ids, folded
959    /// per-bus load/shunt, dropped HVDC/storage, etc.
960    pub warnings: Vec<String>,
961}
962
963/// Build one [`Network`] from in-memory gridfm tables, selecting `scenario`'s
964/// rows. The pure inverse of [`gridfm_record_batches`]: `base_mva` and `name`
965/// come from the caller (the disk path reads them from `gridfm_meta.json`).
966///
967/// # Errors
968/// [`Error::FormatRead`] if a required column is missing or mistyped, a column
969/// carries nulls, a dense index is negative, or `scenario` isn't present; plus
970/// whatever [`Network::validate`] rejects (duplicate / dangling bus ids).
971pub fn read_gridfm_network(
972    tables: &GridfmTables,
973    scenario: i64,
974    base_mva: f64,
975    name: &str,
976) -> Result<GridfmRead> {
977    let bus = bus_columns(std::slice::from_ref(&tables.bus))?;
978    let gens = gen_columns(std::slice::from_ref(&tables.generator))?;
979    let branch = branch_columns(std::slice::from_ref(&tables.branch))?;
980    build_network_from_columns(&bus, &gens, &branch, scenario, base_mva, name, Vec::new())
981}
982
983/// Read one `scenario` from a gridfm dataset on disk and rebuild a [`Network`].
984/// The inverse of [`write_gridfm_dataset`].
985///
986/// `dir` is resolved leniently: the leaf `raw/` directory holding the parquet
987/// files, a `<case>/` directory with a `raw/` child, or a parent directory with
988/// exactly one `*/raw/` child all work. `base_mva` and the case name come from
989/// `gridfm_meta.json` (a missing manifest defaults `base_mva` to 100 and warns).
990///
991/// # Errors
992/// Propagates [`read_gridfm_network`] plus any filesystem / Parquet read error.
993pub fn read_gridfm_dataset(dir: impl AsRef<Path>, scenario: i64) -> Result<GridfmRead> {
994    let raw = resolve_raw_dir(dir.as_ref())?;
995    let (base_mva, name, warnings) = read_meta(&raw);
996    let bus = bus_columns(&read_parquet(&raw.join("bus_data.parquet"))?)?;
997    let gens = gen_columns(&read_parquet(&raw.join("gen_data.parquet"))?)?;
998    let branch = branch_columns(&read_parquet(&raw.join("branch_data.parquet"))?)?;
999    build_network_from_columns(&bus, &gens, &branch, scenario, base_mva, &name, warnings)
1000}
1001
1002/// Read every scenario from a gridfm dataset, one [`Network`] per `scenario` id
1003/// (sorted ascending) over the shared topology — the read side of the scenario
1004/// batch (#57). Each scenario is rebuilt independently, so two scenarios may
1005/// differ in branch status, bus types, and reference bus.
1006///
1007/// # Errors
1008/// Propagates [`read_gridfm_dataset`]'s filesystem / Parquet / build errors.
1009pub fn read_gridfm_scenarios(dir: impl AsRef<Path>) -> Result<Vec<GridfmRead>> {
1010    let raw = resolve_raw_dir(dir.as_ref())?;
1011    let (base_mva, name, warnings) = read_meta(&raw);
1012    // Extract every column once and reuse across scenarios; rebuilding each
1013    // scenario from the raw batches would re-concatenate each table n_scenarios
1014    // times (O(n_scenarios × table_size)).
1015    let bus = bus_columns(&read_parquet(&raw.join("bus_data.parquet"))?)?;
1016    let gens = gen_columns(&read_parquet(&raw.join("gen_data.parquet"))?)?;
1017    let branch = branch_columns(&read_parquet(&raw.join("branch_data.parquet"))?)?;
1018
1019    distinct_sorted(&bus.scenario)
1020        .into_iter()
1021        .map(|s| {
1022            build_network_from_columns(&bus, &gens, &branch, s, base_mva, &name, warnings.clone())
1023        })
1024        .collect()
1025}
1026
1027/// The distinct scenario ids in a gridfm dataset, ascending — the keys
1028/// [`read_gridfm_scenarios`] rebuilds a [`Network`] for. Reads only `bus_data`'s
1029/// scenario column, so it enumerates a dataset's scenarios without rebuilding
1030/// every network; the C ABI's `pio_gridfm_scenario_ids` is a thin wrapper over it.
1031///
1032/// # Errors
1033/// Propagates the directory resolution and `bus_data.parquet` read errors.
1034pub fn gridfm_scenario_ids(dir: impl AsRef<Path>) -> Result<Vec<i64>> {
1035    let raw = resolve_raw_dir(dir.as_ref())?;
1036    let bus = bus_columns(&read_parquet(&raw.join("bus_data.parquet"))?)?;
1037    Ok(distinct_sorted(&bus.scenario))
1038}
1039
1040/// The distinct values of `scenario`, ascending.
1041fn distinct_sorted(scenario: &[i64]) -> Vec<i64> {
1042    let mut ids = scenario.to_vec();
1043    ids.sort_unstable();
1044    ids.dedup();
1045    ids
1046}
1047
1048/// The unperturbed base case: [`read_gridfm_dataset`] at `scenario = 0` (datakit's
1049/// convention). There is no single "shared base" beyond a chosen scenario — bus
1050/// types, branch status, and reference bus all vary per scenario — so the base
1051/// case is just scenario 0.
1052///
1053/// # Errors
1054/// Propagates [`read_gridfm_dataset`].
1055pub fn gridfm_base_case(dir: impl AsRef<Path>) -> Result<GridfmRead> {
1056    read_gridfm_dataset(dir, 0)
1057}
1058
1059/// Every `bus_data` column the reader uses, concatenated across all batches once.
1060/// Extracting columns up front lets a multi-scenario read reuse them rather than
1061/// re-concatenating the whole table for each scenario.
1062struct BusColumns {
1063    scenario: Vec<i64>,
1064    bus: Vec<i64>,
1065    pv: Vec<i64>,
1066    refc: Vec<i64>,
1067    vm: Vec<f64>,
1068    va: Vec<f64>,
1069    vn_kv: Vec<f64>,
1070    min_vm: Vec<f64>,
1071    max_vm: Vec<f64>,
1072    pd: Vec<f64>,
1073    qd: Vec<f64>,
1074    gs: Vec<f64>,
1075    bs: Vec<f64>,
1076}
1077
1078fn bus_columns(batches: &[RecordBatch]) -> Result<BusColumns> {
1079    Ok(BusColumns {
1080        scenario: i64_col(batches, "scenario")?,
1081        bus: i64_col(batches, "bus")?,
1082        pv: i64_col(batches, "PV")?,
1083        refc: i64_col(batches, "REF")?,
1084        vm: f64_col(batches, "Vm")?,
1085        va: f64_col(batches, "Va")?,
1086        vn_kv: f64_col(batches, "vn_kv")?,
1087        min_vm: f64_col(batches, "min_vm_pu")?,
1088        max_vm: f64_col(batches, "max_vm_pu")?,
1089        pd: f64_col(batches, "Pd")?,
1090        qd: f64_col(batches, "Qd")?,
1091        gs: f64_col(batches, "GS")?,
1092        bs: f64_col(batches, "BS")?,
1093    })
1094}
1095
1096/// Every `gen_data` column the reader uses (cost is `cp0`/`cp1`/`cp2`).
1097struct GenColumns {
1098    scenario: Vec<i64>,
1099    bus: Vec<i64>,
1100    p_mw: Vec<f64>,
1101    q_mvar: Vec<f64>,
1102    min_p: Vec<f64>,
1103    max_p: Vec<f64>,
1104    min_q: Vec<f64>,
1105    max_q: Vec<f64>,
1106    cp0: Vec<f64>,
1107    cp1: Vec<f64>,
1108    cp2: Vec<f64>,
1109    in_service: Vec<i64>,
1110}
1111
1112fn gen_columns(batches: &[RecordBatch]) -> Result<GenColumns> {
1113    Ok(GenColumns {
1114        scenario: i64_col(batches, "scenario")?,
1115        bus: i64_col(batches, "bus")?,
1116        p_mw: f64_col(batches, "p_mw")?,
1117        q_mvar: f64_col(batches, "q_mvar")?,
1118        min_p: f64_col(batches, "min_p_mw")?,
1119        max_p: f64_col(batches, "max_p_mw")?,
1120        min_q: f64_col(batches, "min_q_mvar")?,
1121        max_q: f64_col(batches, "max_q_mvar")?,
1122        cp0: f64_col(batches, "cp0_eur")?,
1123        cp1: f64_col(batches, "cp1_eur_per_mw")?,
1124        cp2: f64_col(batches, "cp2_eur_per_mw2")?,
1125        in_service: i64_col(batches, "in_service")?,
1126    })
1127}
1128
1129/// Every `branch_data` column the reader uses (`Y**` / flow columns are ignored).
1130struct BranchColumns {
1131    scenario: Vec<i64>,
1132    from_bus: Vec<i64>,
1133    to_bus: Vec<i64>,
1134    r: Vec<f64>,
1135    x: Vec<f64>,
1136    b: Vec<f64>,
1137    tap: Vec<f64>,
1138    shift: Vec<f64>,
1139    ang_min: Vec<f64>,
1140    ang_max: Vec<f64>,
1141    rate_a: Vec<f64>,
1142    status: Vec<i64>,
1143}
1144
1145fn branch_columns(batches: &[RecordBatch]) -> Result<BranchColumns> {
1146    Ok(BranchColumns {
1147        scenario: i64_col(batches, "scenario")?,
1148        from_bus: i64_col(batches, "from_bus")?,
1149        to_bus: i64_col(batches, "to_bus")?,
1150        r: f64_col(batches, "r")?,
1151        x: f64_col(batches, "x")?,
1152        b: f64_col(batches, "b")?,
1153        tap: f64_col(batches, "tap")?,
1154        shift: f64_col(batches, "shift")?,
1155        ang_min: f64_col(batches, "ang_min")?,
1156        ang_max: f64_col(batches, "ang_max")?,
1157        rate_a: f64_col(batches, "rate_a")?,
1158        status: i64_col(batches, "br_status")?,
1159    })
1160}
1161
1162/// The shared core: rebuild one scenario's [`Network`] from already-extracted
1163/// columns. The columns are concatenated once by the caller and reused across
1164/// scenarios, so a multi-scenario read doesn't re-copy each table per scenario.
1165/// `warnings` is seeded with any manifest-level notes (e.g. a defaulted
1166/// `base_mva`) and extended with the per-read fidelity notes.
1167// tap == 1.0 / != 0.0 reads are exact, not approximate; the builder is one long
1168// linear pass over the three tables, so the length is inherent.
1169#[allow(clippy::float_cmp, clippy::too_many_lines)]
1170fn build_network_from_columns(
1171    bus: &BusColumns,
1172    gens: &GenColumns,
1173    branch: &BranchColumns,
1174    scenario: i64,
1175    base_mva: f64,
1176    name: &str,
1177    mut warnings: Vec<String>,
1178) -> Result<GridfmRead> {
1179    // --- buses, loads, shunts (bus_data) ---
1180    let bus_rows = scenario_rows(&bus.scenario, scenario);
1181    if bus_rows.is_empty() {
1182        let mut avail = bus.scenario.clone();
1183        avail.sort_unstable();
1184        avail.dedup();
1185        return Err(Error::FormatRead {
1186            format: "gridfm",
1187            message: format!("scenario {scenario} not present; available: {avail:?}"),
1188        });
1189    }
1190
1191    let bus_id = &bus.bus;
1192    let pv = &bus.pv;
1193    let refc = &bus.refc;
1194    let vm = &bus.vm;
1195    let va = &bus.va;
1196    let vn_kv = &bus.vn_kv;
1197    let min_vm = &bus.min_vm;
1198    let max_vm = &bus.max_vm;
1199    let pd = &bus.pd;
1200    let qd = &bus.qd;
1201    let gs = &bus.gs;
1202    let bs = &bus.bs;
1203
1204    let mut buses = Vec::with_capacity(bus_rows.len());
1205    let mut loads = Vec::new();
1206    let mut shunts = Vec::new();
1207    // Dense bus index -> voltage magnitude, so a generator recovers its `vg`
1208    // setpoint from its bus (gridfm has no separate gen voltage column, but a
1209    // generator's setpoint is its bus's regulated `Vm`).
1210    let mut bus_vm: std::collections::HashMap<i64, f64> =
1211        std::collections::HashMap::with_capacity(bus_rows.len());
1212    for &r in &bus_rows {
1213        let id = dense_bus_id(bus_id[r])?;
1214        bus_vm.insert(bus_id[r], vm[r]);
1215        // REF / PV / PQ one-hot; the writer guarantees exactly one set, but read
1216        // defensively (REF wins, then PV, else PQ).
1217        let kind = if refc[r] != 0 {
1218            BusType::Ref
1219        } else if pv[r] != 0 {
1220            BusType::Pv
1221        } else {
1222            BusType::Pq
1223        };
1224        buses.push(Bus {
1225            id,
1226            kind,
1227            vm: vm[r],
1228            va: va[r],
1229            base_kv: vn_kv[r],
1230            vmax: max_vm[r],
1231            vmin: min_vm[r],
1232            area: 0,
1233            zone: 0,
1234            name: None,
1235            extras: Extras::new(),
1236        });
1237        if pd[r] != 0.0 || qd[r] != 0.0 {
1238            loads.push(Load {
1239                bus: id,
1240                p: pd[r],
1241                q: qd[r],
1242                in_service: true,
1243                extras: Extras::new(),
1244            });
1245        }
1246        // Undo the writer's `/ base_mva` (gridfm.rs:487) to recover MW/MVAr at V=1.
1247        if gs[r] != 0.0 || bs[r] != 0.0 {
1248            shunts.push(Shunt {
1249                bus: id,
1250                g: gs[r] * base_mva,
1251                b: bs[r] * base_mva,
1252                in_service: true,
1253                extras: Extras::new(),
1254            });
1255        }
1256    }
1257
1258    // --- generators (gen_data) ---
1259    let gen_rows = scenario_rows(&gens.scenario, scenario);
1260    require_scenario_block(&gens.scenario, scenario, &gen_rows, "gen_data")?;
1261    let g_bus = &gens.bus;
1262    let p_mw = &gens.p_mw;
1263    let q_mvar = &gens.q_mvar;
1264    let min_p = &gens.min_p;
1265    let max_p = &gens.max_p;
1266    let min_q = &gens.min_q;
1267    let max_q = &gens.max_q;
1268    let cp0 = &gens.cp0;
1269    let cp1 = &gens.cp1;
1270    let cp2 = &gens.cp2;
1271    let g_in = &gens.in_service;
1272
1273    let mut generators = Vec::with_capacity(gen_rows.len());
1274    for &r in &gen_rows {
1275        // Any nonzero coefficient is a real polynomial cost. An all-zero triple is
1276        // ambiguous in the schema — a generator with no cost, a genuine zero
1277        // polynomial cost, or a piecewise/cubic+ cost the writer couldn't represent
1278        // (all written as `(0, 0, 0)`) — so it reads back as `None` (see warnings).
1279        let cost = if cp0[r] != 0.0 || cp1[r] != 0.0 || cp2[r] != 0.0 {
1280            Some(GenCost {
1281                model: 2,
1282                startup: 0.0,
1283                shutdown: 0.0,
1284                ncost: 3,
1285                coeffs: vec![cp2[r], cp1[r], cp0[r]],
1286            })
1287        } else {
1288            None
1289        };
1290        generators.push(Generator {
1291            bus: dense_bus_id(g_bus[r])?,
1292            pg: p_mw[r],
1293            qg: q_mvar[r],
1294            pmax: max_p[r],
1295            pmin: min_p[r],
1296            qmax: max_q[r],
1297            qmin: min_q[r],
1298            // The schema has no gen vg; recover the setpoint from the bus's Vm
1299            // (falls back to 1.0 only if the gen references an absent bus, which
1300            // `validate()` below then rejects).
1301            vg: bus_vm.get(&g_bus[r]).copied().unwrap_or(1.0),
1302            mbase: base_mva,
1303            in_service: g_in[r] != 0,
1304            cost,
1305            caps: [None; 11],
1306        });
1307    }
1308
1309    // --- branches (branch_data); Y** and pf/qf/pt/qt are ignored ---
1310    let br_rows = scenario_rows(&branch.scenario, scenario);
1311    require_scenario_block(&branch.scenario, scenario, &br_rows, "branch_data")?;
1312    let from_bus = &branch.from_bus;
1313    let to_bus = &branch.to_bus;
1314    let r_col = &branch.r;
1315    let x_col = &branch.x;
1316    let b_col = &branch.b;
1317    let tap = &branch.tap;
1318    let shift = &branch.shift;
1319    let ang_min = &branch.ang_min;
1320    let ang_max = &branch.ang_max;
1321    let rate_a = &branch.rate_a;
1322    let br_status = &branch.status;
1323
1324    let mut branches = Vec::with_capacity(br_rows.len());
1325    // The writer stores the *effective* tap (`Branch::effective_tap`), so a line
1326    // (raw tap 0) lands as 1.0. Map unit tap + no shift back to the raw `tap == 0`
1327    // line convention, otherwise every line reads as a transformer
1328    // (`is_transformer` keys off `tap != 0`) and a read→write to a format that
1329    // splits lines from transformers (PSS/E, PowerWorld) misclassifies them. A
1330    // genuine unity-ratio, zero-shift transformer is electrically identical to a
1331    // line and is (unavoidably) read as one.
1332    let mut unit_tap_lines = 0usize;
1333    for &row in &br_rows {
1334        let shift_v = shift[row];
1335        let tap_out = if tap[row] == 1.0 && shift_v == 0.0 {
1336            unit_tap_lines += 1;
1337            0.0
1338        } else {
1339            tap[row]
1340        };
1341        branches.push(Branch {
1342            from: dense_bus_id(from_bus[row])?,
1343            to: dense_bus_id(to_bus[row])?,
1344            r: r_col[row],
1345            x: x_col[row],
1346            b: b_col[row],
1347            rate_a: rate_a[row],
1348            rate_b: 0.0,
1349            rate_c: 0.0,
1350            tap: tap_out,
1351            shift: shift_v,
1352            in_service: br_status[row] != 0,
1353            angmin: ang_min[row],
1354            angmax: ang_max[row],
1355            extras: Extras::new(),
1356        });
1357    }
1358
1359    let net = Network {
1360        name: name.to_string(),
1361        base_mva,
1362        buses,
1363        loads,
1364        shunts,
1365        branches,
1366        generators,
1367        storage: Vec::new(),
1368        hvdc: Vec::new(),
1369        source_format: SourceFormat::Gridfm,
1370        source: None,
1371    };
1372    net.validate()?;
1373
1374    // --- fidelity warnings: what the gridfm schema couldn't carry back ---
1375    warnings.push(format!(
1376        "synthesized bus ids 1..={}; original bus ids are not stored in a gridfm dataset, \
1377         so a written case is renumbered",
1378        net.buses.len()
1379    ));
1380    if !net.loads.is_empty() {
1381        warnings.push(format!(
1382            "folded nodal load into {} synthetic per-bus Load(s); per-load granularity is \
1383             not recoverable",
1384            net.loads.len()
1385        ));
1386    }
1387    if !net.shunts.is_empty() {
1388        warnings.push(format!(
1389            "folded nodal shunts into {} synthetic per-bus Shunt(s); per-shunt granularity \
1390             is not recoverable",
1391            net.shunts.len()
1392        ));
1393    }
1394    if unit_tap_lines > 0 {
1395        warnings.push(format!(
1396            "{unit_tap_lines} branch(es) had unit effective tap and no phase shift and were read \
1397             as lines (raw tap 0); a unity-ratio, zero-shift transformer in the source is \
1398             indistinguishable from a line and is read as one (the power flow is identical)"
1399        ));
1400    }
1401    let no_cost_gens = net.generators.iter().filter(|g| g.cost.is_none()).count();
1402    if no_cost_gens > 0 {
1403        warnings.push(format!(
1404            "{no_cost_gens} generator(s) read with no cost: an all-zero cost triple in the dataset \
1405             is the writer's encoding for a generator with no cost, a genuine zero polynomial \
1406             cost, or a piecewise/cubic+ cost it couldn't represent — indistinguishable on read"
1407        ));
1408    }
1409    warnings.push(
1410        "HVDC, storage, areas/zones, bus names, rate_b/rate_c, generator mbase/ramp limits, \
1411         and startup/shutdown costs are absent from the gridfm schema"
1412            .to_string(),
1413    );
1414
1415    Ok(GridfmRead {
1416        network: net,
1417        scenario,
1418        warnings,
1419    })
1420}
1421
1422// --- reader helpers --------------------------------------------------------
1423
1424/// Resolve the directory holding the gridfm parquet files, accepting (in order)
1425/// the leaf `raw/` dir itself, a `<case>/` dir with a `raw/` child, or a parent
1426/// dir with exactly one `*/raw/` child.
1427fn resolve_raw_dir(dir: &Path) -> Result<PathBuf> {
1428    let has_bus = |d: &Path| d.join("bus_data.parquet").is_file();
1429    if has_bus(dir) {
1430        return Ok(dir.to_path_buf());
1431    }
1432    let nested = dir.join("raw");
1433    if has_bus(&nested) {
1434        return Ok(nested);
1435    }
1436    // Parent-of-<case> fallback: scan for exactly one `*/raw/bus_data.parquet`.
1437    // Surface read_dir / entry IO errors (missing dir, permissions) rather than
1438    // masking them as "no dataset found".
1439    let mut matches: Vec<PathBuf> = Vec::new();
1440    let entries = std::fs::read_dir(dir).map_err(|e| Error::FormatRead {
1441        format: "gridfm",
1442        message: format!("reading directory {}: {e}", dir.display()),
1443    })?;
1444    for entry in entries {
1445        let entry = entry.map_err(|e| Error::FormatRead {
1446            format: "gridfm",
1447            message: format!("reading an entry of {}: {e}", dir.display()),
1448        })?;
1449        let raw = entry.path().join("raw");
1450        if has_bus(&raw) {
1451            matches.push(raw);
1452        }
1453    }
1454    match matches.len() {
1455        1 => Ok(matches.pop().expect("len checked")),
1456        0 => Err(Error::FormatRead {
1457            format: "gridfm",
1458            message: format!(
1459                "no gridfm dataset under {}; expected bus_data.parquet in the directory, a \
1460                 raw/ child, or a single <case>/raw/ child",
1461                dir.display()
1462            ),
1463        }),
1464        n => Err(Error::FormatRead {
1465            format: "gridfm",
1466            message: format!(
1467                "{n} gridfm datasets under {}; point at the specific <case>/raw directory",
1468                dir.display()
1469            ),
1470        }),
1471    }
1472}
1473
1474/// Read `base_mva` and the case name from `raw/gridfm_meta.json`. A missing or
1475/// unreadable manifest is not fatal — a bare prediction may lack one — so default
1476/// `base_mva` to 100, derive the name from the `<case>/raw` parent, and warn.
1477fn read_meta(raw: &Path) -> (f64, String, Vec<String>) {
1478    let case_from_path = || {
1479        raw.parent()
1480            .and_then(Path::file_name)
1481            .and_then(|s| s.to_str())
1482            .map_or_else(|| "gridfm".to_string(), str::to_string)
1483    };
1484    let text = match std::fs::read_to_string(raw.join("gridfm_meta.json")) {
1485        Ok(text) => text,
1486        // Not just "not found": surface the underlying error (permissions, non-UTF-8,
1487        // etc.) so a dataset problem is diagnosable, while still defaulting base_mva.
1488        Err(e) => {
1489            return (
1490                100.0,
1491                case_from_path(),
1492                vec![format!(
1493                    "gridfm_meta.json could not be read ({e}); base_mva defaulted to 100"
1494                )],
1495            );
1496        }
1497    };
1498    let Ok(meta) = serde_json::from_str::<serde_json::Value>(&text) else {
1499        return (
1500            100.0,
1501            case_from_path(),
1502            vec!["gridfm_meta.json is not valid JSON; base_mva defaulted to 100".to_string()],
1503        );
1504    };
1505    let name = meta
1506        .get("case_name")
1507        .and_then(serde_json::Value::as_str)
1508        .map_or_else(case_from_path, str::to_string);
1509    let mut warnings = Vec::new();
1510    // base_mva must be a positive, finite number; anything else (absent, zero,
1511    // negative, NaN) defaults to 100 with a note — shunt recovery scales by it.
1512    let base = match meta.get("base_mva").and_then(serde_json::Value::as_f64) {
1513        Some(b) if b.is_finite() && b > 0.0 => b,
1514        _ => {
1515            warnings.push(
1516                "gridfm_meta.json has no usable base_mva (absent or not a positive number); \
1517                 defaulted to 100"
1518                    .to_string(),
1519            );
1520            100.0
1521        }
1522    };
1523    (base, name, warnings)
1524}
1525
1526/// Open a parquet file and collect every record batch (one for powerio-written
1527/// data, possibly several for an externally-written prediction).
1528fn read_parquet(path: &Path) -> Result<Vec<RecordBatch>> {
1529    let file = std::fs::File::open(path).map_err(|e| Error::FormatRead {
1530        format: "gridfm",
1531        message: format!("opening {}: {e}", path.display()),
1532    })?;
1533    let reader = ParquetRecordBatchReaderBuilder::try_new(file)
1534        .and_then(ParquetRecordBatchReaderBuilder::build)
1535        .map_err(|e| Error::FormatRead {
1536            format: "gridfm",
1537            message: format!("reading {}: {e}", path.display()),
1538        })?;
1539    reader
1540        .collect::<std::result::Result<Vec<_>, _>>()
1541        .map_err(|e| Error::FormatRead {
1542            format: "gridfm",
1543            message: format!("decoding {}: {e}", path.display()),
1544        })
1545}
1546
1547/// Row indices whose `scenario` column equals `scenario`, in table order.
1548fn scenario_rows(scen: &[i64], scenario: i64) -> Vec<usize> {
1549    scen.iter()
1550        .enumerate()
1551        .filter_map(|(i, &s)| (s == scenario).then_some(i))
1552        .collect()
1553}
1554
1555/// Guard a gen/branch table: empty `rows` is fine when the whole table is empty
1556/// (a case legitimately has no generators, or no branches), but if the table
1557/// holds rows for *other* scenarios yet none for this one, the dataset is partial
1558/// or corrupt and would silently yield a wrong-but-valid network — error instead.
1559fn require_scenario_block(
1560    scen_col: &[i64],
1561    scenario: i64,
1562    rows: &[usize],
1563    table: &str,
1564) -> Result<()> {
1565    if rows.is_empty() && !scen_col.is_empty() {
1566        return Err(Error::FormatRead {
1567            format: "gridfm",
1568            message: format!(
1569                "scenario {scenario} has no {table} rows, but the table holds {} row(s) for other \
1570                 scenarios — a partial or corrupt dataset",
1571                scen_col.len()
1572            ),
1573        });
1574    }
1575    Ok(())
1576}
1577
1578/// A dense `[0, n)` parquet index → 1-based [`BusId`]. Errors on a negative index.
1579fn dense_bus_id(v: i64) -> Result<BusId> {
1580    let idx = usize::try_from(v).map_err(|_| Error::FormatRead {
1581        format: "gridfm",
1582        message: format!("negative dense bus index {v}"),
1583    })?;
1584    Ok(BusId(idx + 1))
1585}
1586
1587/// Look up a named column, erroring if absent.
1588fn column<'a>(b: &'a RecordBatch, name: &str) -> Result<&'a ArrayRef> {
1589    b.column_by_name(name).ok_or_else(|| Error::FormatRead {
1590        format: "gridfm",
1591        message: format!("missing column `{name}`"),
1592    })
1593}
1594
1595/// Concatenate a named non-null `Int64` column across all batches.
1596fn i64_col(batches: &[RecordBatch], name: &str) -> Result<Vec<i64>> {
1597    let mut out = Vec::with_capacity(batches.iter().map(RecordBatch::num_rows).sum());
1598    for b in batches {
1599        let arr = column(b, name)?;
1600        let col = arr
1601            .as_any()
1602            .downcast_ref::<Int64Array>()
1603            .ok_or_else(|| Error::FormatRead {
1604                format: "gridfm",
1605                message: format!("column `{name}` is not Int64"),
1606            })?;
1607        if col.null_count() > 0 {
1608            return Err(Error::FormatRead {
1609                format: "gridfm",
1610                message: format!("column `{name}` has nulls"),
1611            });
1612        }
1613        out.extend_from_slice(col.values());
1614    }
1615    Ok(out)
1616}
1617
1618/// Concatenate a named non-null `Float64` column across all batches.
1619fn f64_col(batches: &[RecordBatch], name: &str) -> Result<Vec<f64>> {
1620    let mut out = Vec::with_capacity(batches.iter().map(RecordBatch::num_rows).sum());
1621    for b in batches {
1622        let arr = column(b, name)?;
1623        let col = arr
1624            .as_any()
1625            .downcast_ref::<Float64Array>()
1626            .ok_or_else(|| Error::FormatRead {
1627                format: "gridfm",
1628                message: format!("column `{name}` is not Float64"),
1629            })?;
1630        if col.null_count() > 0 {
1631            return Err(Error::FormatRead {
1632                format: "gridfm",
1633                message: format!("column `{name}` has nulls"),
1634            });
1635        }
1636        out.extend_from_slice(col.values());
1637    }
1638    Ok(out)
1639}
1640
1641#[cfg(test)]
1642mod tests {
1643    use super::*;
1644    use crate::network::{Branch, Bus, BusId, Extras, Generator};
1645    use arrow::array::{Float64Array, Int64Array};
1646    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1647
1648    const BUS_COLS: &[&str] = &[
1649        "scenario",
1650        "load_scenario_idx",
1651        "bus",
1652        "Pd",
1653        "Qd",
1654        "Pg",
1655        "Qg",
1656        "Vm",
1657        "Va",
1658        "PQ",
1659        "PV",
1660        "REF",
1661        "vn_kv",
1662        "min_vm_pu",
1663        "max_vm_pu",
1664        "GS",
1665        "BS",
1666    ];
1667    const GEN_COLS: &[&str] = &[
1668        "scenario",
1669        "load_scenario_idx",
1670        "idx",
1671        "bus",
1672        "p_mw",
1673        "q_mvar",
1674        "min_p_mw",
1675        "max_p_mw",
1676        "min_q_mvar",
1677        "max_q_mvar",
1678        "cp0_eur",
1679        "cp1_eur_per_mw",
1680        "cp2_eur_per_mw2",
1681        "in_service",
1682        "is_slack_gen",
1683    ];
1684    const BRANCH_COLS: &[&str] = &[
1685        "scenario",
1686        "load_scenario_idx",
1687        "idx",
1688        "from_bus",
1689        "to_bus",
1690        "pf",
1691        "qf",
1692        "pt",
1693        "qt",
1694        "r",
1695        "x",
1696        "b",
1697        "Yff_r",
1698        "Yff_i",
1699        "Yft_r",
1700        "Yft_i",
1701        "Ytf_r",
1702        "Ytf_i",
1703        "Ytt_r",
1704        "Ytt_i",
1705        "tap",
1706        "shift",
1707        "ang_min",
1708        "ang_max",
1709        "rate_a",
1710        "br_status",
1711    ];
1712    const YBUS_COLS: &[&str] = &[
1713        "scenario",
1714        "load_scenario_idx",
1715        "index1",
1716        "index2",
1717        "G",
1718        "B",
1719    ];
1720
1721    fn case14() -> Network {
1722        let path = concat!(env!("CARGO_MANIFEST_DIR"), "/../tests/data/case14.m");
1723        crate::parse_matpower_file(path).unwrap()
1724    }
1725
1726    fn names(b: &RecordBatch) -> Vec<String> {
1727        b.schema()
1728            .fields()
1729            .iter()
1730            .map(|f| f.name().clone())
1731            .collect()
1732    }
1733
1734    fn col_i64<'a>(b: &'a RecordBatch, name: &str) -> &'a Int64Array {
1735        b.column_by_name(name)
1736            .unwrap()
1737            .as_any()
1738            .downcast_ref()
1739            .unwrap()
1740    }
1741
1742    fn col_f64<'a>(b: &'a RecordBatch, name: &str) -> &'a Float64Array {
1743        b.column_by_name(name)
1744            .unwrap()
1745            .as_any()
1746            .downcast_ref()
1747            .unwrap()
1748    }
1749
1750    fn read(path: &Path) -> RecordBatch {
1751        let file = std::fs::File::open(path).unwrap();
1752        let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
1753            .unwrap()
1754            .build()
1755            .unwrap();
1756        // case14 fits in one row group / batch.
1757        reader.next().unwrap().unwrap()
1758    }
1759
1760    #[test]
1761    fn schema_and_row_counts_match_case14() {
1762        let net = case14();
1763        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1764
1765        assert_eq!(names(&tables.bus), BUS_COLS);
1766        assert_eq!(names(&tables.generator), GEN_COLS);
1767        assert_eq!(names(&tables.branch), BRANCH_COLS);
1768        assert_eq!(names(tables.y_bus.as_ref().unwrap()), YBUS_COLS);
1769
1770        assert_eq!(tables.bus.num_rows(), net.buses.len()); // 14
1771        assert_eq!(tables.generator.num_rows(), net.generators.len()); // 5
1772        assert_eq!(tables.branch.num_rows(), net.branches.len()); // 20
1773    }
1774
1775    #[test]
1776    fn parquet_round_trips_through_reader() {
1777        let net = case14();
1778        let dir = tempfile::tempdir().unwrap();
1779        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
1780
1781        let raw = dir.path().join("case14").join("raw");
1782        assert_eq!(out.dir, raw);
1783        for f in ["bus_data", "gen_data", "branch_data", "y_bus_data"] {
1784            assert!(raw.join(format!("{f}.parquet")).is_file(), "missing {f}");
1785        }
1786        assert!(raw.join("gridfm_meta.json").is_file());
1787
1788        let bus = read(&raw.join("bus_data.parquet"));
1789        assert_eq!(names(&bus), BUS_COLS);
1790        assert_eq!(bus.num_rows(), net.buses.len());
1791        assert_eq!(names(&read(&raw.join("gen_data.parquet"))), GEN_COLS);
1792        assert_eq!(names(&read(&raw.join("branch_data.parquet"))), BRANCH_COLS);
1793        assert_eq!(names(&read(&raw.join("y_bus_data.parquet"))), YBUS_COLS);
1794    }
1795
1796    #[test]
1797    fn bus_table_values_are_consistent() {
1798        let net = case14();
1799        let view = IndexedNetwork::new(&net);
1800        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1801        let bus = &tables.bus;
1802
1803        // Exactly one reference bus; PQ/PV/REF partition every bus.
1804        let (pq, pv, r) = (col_i64(bus, "PQ"), col_i64(bus, "PV"), col_i64(bus, "REF"));
1805        assert_eq!(r.values().iter().sum::<i64>(), 1);
1806        for i in 0..bus.num_rows() {
1807            assert_eq!(pq.value(i) + pv.value(i) + r.value(i), 1);
1808        }
1809
1810        // GS/BS are the per-bus shunt aggregate divided by base_mva.
1811        let base = net.base_mva;
1812        let gs = col_f64(bus, "GS");
1813        for i in 0..bus.num_rows() {
1814            assert!((gs.value(i) - view.gs()[i] / base).abs() < 1e-12);
1815        }
1816
1817        // `bus` column is the dense 0..n range.
1818        let bus_idx = col_i64(bus, "bus");
1819        for i in 0..bus.num_rows() {
1820            assert_eq!(bus_idx.value(i), i as i64);
1821        }
1822    }
1823
1824    #[test]
1825    fn branch_admittance_columns_match_build_ybus() {
1826        // The branch table's Y** columns are the same kernel build_ybus scatters,
1827        // so a known in-service branch's block must equal branch_admittance.
1828        let net = case14();
1829        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1830        let br = &tables.branch;
1831
1832        let yff_r = col_f64(br, "Yff_r");
1833        let yff_i = col_f64(br, "Yff_i");
1834        for (row, branch) in net.branches.iter().enumerate() {
1835            // Raw fixture, so the shift is in degrees — convert as build_ybus does.
1836            let shift_rad = branch.shift.to_radians();
1837            if let Some(block) =
1838                branch_admittance(branch, YbusFlags::default(), shift_rad, row).unwrap()
1839            {
1840                assert!((yff_r.value(row) - block[0].re).abs() < 1e-12);
1841                assert!((yff_i.value(row) - block[0].im).abs() < 1e-12);
1842            }
1843        }
1844    }
1845
1846    #[test]
1847    fn is_slack_gen_marks_the_reference_bus() {
1848        let net = case14();
1849        let view = IndexedNetwork::new(&net);
1850        let ref_bus = view.reference_bus_index().unwrap();
1851        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1852        let g = &tables.generator;
1853
1854        let bus = col_i64(g, "bus");
1855        let slack = col_i64(g, "is_slack_gen");
1856        for i in 0..g.num_rows() {
1857            assert_eq!(slack.value(i) == 1, bus.value(i) as usize == ref_bus);
1858        }
1859        assert!(slack.values().contains(&1), "no slack generator");
1860    }
1861
1862    #[test]
1863    fn branch_flows_close_the_power_balance_on_a_solved_case() {
1864        // case14 ships a converged operating point, so the active flows must obey
1865        // KCL: total branch loss = generation - demand - shunt draw. This is the
1866        // value-level guard on branch_flows (a missing ×base, a sign flip, or a
1867        // wrong conj would break it). Every branch's real loss is also ≥ 0.
1868        let net = case14();
1869        let view = IndexedNetwork::new(&net);
1870        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1871        let br = &tables.branch;
1872        let (pf, pt, status) = (
1873            col_f64(br, "pf"),
1874            col_f64(br, "pt"),
1875            col_i64(br, "br_status"),
1876        );
1877
1878        let mut loss = 0.0;
1879        for i in 0..br.num_rows() {
1880            if status.value(i) == 1 {
1881                let l = pf.value(i) + pt.value(i);
1882                assert!(l >= -1e-6, "branch {i} has negative real loss {l}");
1883                loss += l;
1884            }
1885        }
1886        assert!(loss > 1.0, "case14 has ~13 MW of real loss, got {loss}");
1887
1888        let gen_p: f64 = net
1889            .generators
1890            .iter()
1891            .filter(|g| g.in_service)
1892            .map(|g| g.pg)
1893            .sum();
1894        let load_p: f64 = net.loads.iter().map(|l| l.p).sum();
1895        // Real shunt draw at the stored voltages: gs() is MW at V = 1.
1896        let shunt_p: f64 = (0..view.n())
1897            .map(|i| view.gs()[i] * net.buses[i].vm.powi(2))
1898            .sum();
1899        assert!(
1900            (loss - (gen_p - load_p - shunt_p)).abs() < 0.5,
1901            "power balance off: loss {loss} vs gen-load-shunt {}",
1902            gen_p - load_p - shunt_p
1903        );
1904    }
1905
1906    #[test]
1907    fn zero_impedance_branch_zeros_columns_and_is_counted() {
1908        // No vendored fixture has r = x = 0, so build one: branch 0 is a zero-
1909        // impedance tie, branch 1 is normal. The tie's admittance and flow columns
1910        // must be zero (never NaN), and the manifest must count it.
1911        let net = Network::in_memory(
1912            "zeroimp",
1913            100.0,
1914            vec![
1915                bus(1, BusType::Ref),
1916                bus(2, BusType::Pq),
1917                bus(3, BusType::Pq),
1918            ],
1919            vec![branch(1, 2, 0.0, 0.0), branch(2, 3, 0.01, 0.1)],
1920        );
1921        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
1922        let br = &tables.branch;
1923        for col in [
1924            "Yff_r", "Yff_i", "Yft_r", "Yft_i", "Ytf_r", "Ytf_i", "Ytt_r", "Ytt_i", "pf", "qf",
1925            "pt", "qt",
1926        ] {
1927            let v = col_f64(br, col).value(0);
1928            assert!(
1929                v == 0.0,
1930                "{col} should be 0 for the zero-impedance branch, got {v}"
1931            );
1932        }
1933
1934        let dir = tempfile::tempdir().unwrap();
1935        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
1936        assert_eq!(out.dropped_zero_impedance, 1);
1937        let meta: serde_json::Value = serde_json::from_str(
1938            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
1939        )
1940        .unwrap();
1941        assert_eq!(meta["dropped_zero_impedance"], 1);
1942    }
1943
1944    #[test]
1945    fn gridfm_cost_maps_every_arm_to_raw_coefficients() {
1946        // Polynomial coeffs are highest-order first: [c2, c1, c0] -> (cp0, cp1, cp2).
1947        assert_eq!(
1948            gridfm_cost(Some(&gencost(2, 3, vec![2.0, 3.0, 4.0]))),
1949            (4.0, 3.0, 2.0)
1950        );
1951        assert_eq!(
1952            gridfm_cost(Some(&gencost(2, 2, vec![3.0, 4.0]))),
1953            (4.0, 3.0, 0.0)
1954        );
1955        assert_eq!(
1956            gridfm_cost(Some(&gencost(2, 1, vec![4.0]))),
1957            (4.0, 0.0, 0.0)
1958        );
1959        // Unrepresentable rows collapse to zeros and report as not representable.
1960        let piecewise = gencost(1, 2, vec![0.0, 0.0, 1.0, 1.0]);
1961        let malformed = gencost(2, 3, vec![1.0]); // fewer coeffs than ncost claims
1962        assert_eq!(gridfm_cost(Some(&piecewise)), (0.0, 0.0, 0.0));
1963        assert_eq!(gridfm_cost(Some(&malformed)), (0.0, 0.0, 0.0));
1964        assert_eq!(gridfm_cost(None), (0.0, 0.0, 0.0));
1965        assert!(!cost_representable(Some(&piecewise)));
1966        assert!(!cost_representable(Some(&malformed)));
1967        assert!(!cost_representable(None));
1968        assert!(cost_representable(Some(&gencost(
1969            2,
1970            3,
1971            vec![1.0, 2.0, 3.0]
1972        ))));
1973    }
1974
1975    #[test]
1976    fn missing_reference_bus_errors() {
1977        // gridfm_record_batches' documented precondition: exactly one ref bus.
1978        let net = Network::in_memory(
1979            "noref",
1980            100.0,
1981            vec![bus(1, BusType::Pq), bus(2, BusType::Pq)],
1982            vec![branch(1, 2, 0.01, 0.1)],
1983        );
1984        let err = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap_err();
1985        assert!(
1986            matches!(err, Error::ReferenceBusCount { .. }),
1987            "got {err:?}"
1988        );
1989    }
1990
1991    #[test]
1992    fn non_finite_bus_voltage_errors_before_parquet() {
1993        let mut net = case14();
1994        net.buses[0].vm = f64::NAN;
1995        let err = gridfm_record_batches(&net, 7, &GridfmOptions::default()).unwrap_err();
1996        match err {
1997            Error::NonFiniteGridfmValue {
1998                scenario,
1999                element,
2000                row,
2001                field,
2002                value,
2003            } => {
2004                assert_eq!(scenario, 7);
2005                assert_eq!(element, "bus");
2006                assert_eq!(row, 0);
2007                assert_eq!(field, "vm");
2008                assert!(value.is_nan());
2009            }
2010            other => panic!("expected NonFiniteGridfmValue, got {other:?}"),
2011        }
2012    }
2013
2014    #[test]
2015    fn non_finite_tap_errors_even_without_y_bus_table() {
2016        let mut net = case14();
2017        net.branches[0].tap = f64::NAN;
2018        let opts = GridfmOptions {
2019            include_y_bus: false,
2020            ..Default::default()
2021        };
2022        let err = gridfm_record_batches(&net, 0, &opts).unwrap_err();
2023        assert!(
2024            matches!(
2025                err,
2026                Error::NonFiniteGridfmValue {
2027                    element: "branch",
2028                    row: 0,
2029                    field: "tap",
2030                    ..
2031                }
2032            ),
2033            "got {err:?}"
2034        );
2035    }
2036
2037    #[test]
2038    fn normalized_snapshot_is_rejected_in_release_builds() {
2039        let net = case14().to_normalized().unwrap();
2040        let err = gridfm_record_batches(&net, 3, &GridfmOptions::default()).unwrap_err();
2041        assert!(
2042            matches!(err, Error::NormalizedGridfmSnapshot { scenario: 3 }),
2043            "got {err:?}"
2044        );
2045    }
2046
2047    #[test]
2048    fn non_finite_representable_cost_errors() {
2049        let mut net = Network::in_memory(
2050            "badcost",
2051            100.0,
2052            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2053            vec![branch(1, 2, 0.01, 0.1)],
2054        );
2055        net.generators
2056            .push(gen_at(1, gencost(2, 3, vec![f64::NAN, 1.0, 0.0])));
2057
2058        // coeffs are highest-order first, so the NaN lands in the cp2 column.
2059        let err = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap_err();
2060        assert!(
2061            matches!(
2062                err,
2063                Error::NonFiniteGridfmValue {
2064                    element: "gencost",
2065                    row: 0,
2066                    field: "cp2",
2067                    ..
2068                }
2069            ),
2070            "got {err:?}"
2071        );
2072    }
2073
2074    #[test]
2075    fn unbounded_limits_export_as_infinity() {
2076        // PowerModels defaults absent gen limits to ±Inf and MATPOWER carries
2077        // literal Inf tokens; the unbounded sentinel must reach the Parquet
2078        // columns, not abort the export (only NaN is corrupt in a limit).
2079        let mut net = case14();
2080        net.generators[0].qmax = f64::INFINITY;
2081        net.generators[0].qmin = f64::NEG_INFINITY;
2082        net.generators[1].pmax = f64::INFINITY;
2083        net.branches[0].angmin = f64::NEG_INFINITY;
2084        net.branches[0].angmax = f64::INFINITY;
2085        net.branches[1].rate_a = f64::INFINITY;
2086        net.buses[0].vmax = f64::INFINITY;
2087        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2088        let qmax = col_f64(&tables.generator, "max_q_mvar");
2089        assert!(qmax.value(0).is_infinite() && qmax.value(0) > 0.0);
2090    }
2091
2092    #[test]
2093    fn nan_limit_still_errors() {
2094        let mut net = case14();
2095        net.generators[0].qmax = f64::NAN;
2096        let err = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap_err();
2097        assert!(
2098            matches!(
2099                err,
2100                Error::NonFiniteGridfmValue {
2101                    element: "generator",
2102                    field: "qmax",
2103                    ..
2104                }
2105            ),
2106            "got {err:?}"
2107        );
2108    }
2109
2110    /// case14 with every load and generator setpoint scaled — a perturbed
2111    /// operating point on the same topology, the scenario-batch contract.
2112    fn scaled(net: &Network, factor: f64) -> Network {
2113        let mut s = net.clone();
2114        for l in &mut s.loads {
2115            l.p *= factor;
2116            l.q *= factor;
2117        }
2118        for g in &mut s.generators {
2119            g.pg *= factor;
2120            g.qg *= factor;
2121        }
2122        s
2123    }
2124
2125    #[test]
2126    fn batch_stacks_scenarios_keyed_by_scenario_column() {
2127        let base = case14();
2128        let up = scaled(&base, 1.1);
2129        let down = scaled(&base, 0.9);
2130        let snaps = [
2131            GridfmSnapshot {
2132                net: &base,
2133                scenario: 0,
2134            },
2135            GridfmSnapshot {
2136                net: &up,
2137                scenario: 1,
2138            },
2139            GridfmSnapshot {
2140                net: &down,
2141                scenario: 2,
2142            },
2143        ];
2144        let tables = gridfm_record_batches_batch(&snaps, &GridfmOptions::default()).unwrap();
2145
2146        // Schema is unchanged; rows are 3× the single-snapshot counts.
2147        assert_eq!(names(&tables.bus), BUS_COLS);
2148        assert_eq!(names(&tables.branch), BRANCH_COLS);
2149        assert_eq!(tables.bus.num_rows(), 3 * base.buses.len());
2150        assert_eq!(tables.generator.num_rows(), 3 * base.generators.len());
2151        assert_eq!(tables.branch.num_rows(), 3 * base.branches.len());
2152
2153        // The scenario column is blocked 0.., and the dense bus index resets to
2154        // 0..n within each scenario.
2155        let n = base.buses.len();
2156        let scen = col_i64(&tables.bus, "scenario");
2157        let lsi = col_i64(&tables.bus, "load_scenario_idx");
2158        let bus_idx = col_i64(&tables.bus, "bus");
2159        for k in 0..3 {
2160            for i in 0..n {
2161                let row = k * n + i;
2162                assert_eq!(scen.value(row), k as i64);
2163                assert_eq!(lsi.value(row), k as i64);
2164                assert_eq!(bus_idx.value(row), i as i64);
2165            }
2166        }
2167
2168        // The first scenario's rows match the standalone single-case tables, so
2169        // batching is a pure row-stack over the established single-snapshot path.
2170        // Compare every column bit-exactly (not just one), so a per-column offset
2171        // or ordering regression in the row-stack can't slip through.
2172        let single = gridfm_record_batches(&base, 0, &GridfmOptions::default()).unwrap();
2173        let bit_exact = |b: &RecordBatch, s: &RecordBatch, col: &str, rows: usize| {
2174            let (bb, ss) = (col_f64(b, col), col_f64(s, col));
2175            for i in 0..rows {
2176                assert_eq!(
2177                    bb.value(i).to_bits(),
2178                    ss.value(i).to_bits(),
2179                    "scenario-0 {col}[{i}] differs from the single-case path"
2180                );
2181            }
2182        };
2183        for col in ["Pd", "Qd", "Pg", "Qg", "Vm", "Va", "GS", "BS"] {
2184            bit_exact(&tables.bus, &single.bus, col, n);
2185        }
2186        bit_exact(
2187            &tables.generator,
2188            &single.generator,
2189            "p_mw",
2190            base.generators.len(),
2191        );
2192        bit_exact(&tables.branch, &single.branch, "pf", base.branches.len());
2193
2194        // The perturbed scenario's load really differs (guards against stamping
2195        // the same network three times).
2196        let pd_batch = col_f64(&tables.bus, "Pd");
2197        let pd_single = col_f64(&single.bus, "Pd");
2198        assert!((pd_batch.value(n) - 1.1 * pd_single.value(0)).abs() < 1e-9);
2199    }
2200
2201    #[test]
2202    fn batch_dataset_writes_stacked_parquet_with_scenario_count() {
2203        let base = case14();
2204        let up = scaled(&base, 1.25);
2205        let snaps = [
2206            GridfmSnapshot {
2207                net: &base,
2208                scenario: 0,
2209            },
2210            GridfmSnapshot {
2211                net: &up,
2212                scenario: 1,
2213            },
2214        ];
2215        let dir = tempfile::tempdir().unwrap();
2216        let out = write_gridfm_batch(&snaps, dir.path(), &GridfmOptions::default()).unwrap();
2217
2218        let bus = read(&out.dir.join("bus_data.parquet"));
2219        assert_eq!(bus.num_rows(), 2 * base.buses.len());
2220        let scen = col_i64(&bus, "scenario");
2221        assert_eq!(scen.value(0), 0);
2222        assert_eq!(scen.value(base.buses.len()), 1);
2223
2224        let meta: serde_json::Value = serde_json::from_str(
2225            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2226        )
2227        .unwrap();
2228        assert_eq!(meta["n_scenarios"], 2);
2229        assert_eq!(meta["scenario"], 0);
2230    }
2231
2232    #[test]
2233    fn empty_batch_errors() {
2234        let err = gridfm_record_batches_batch(&[], &GridfmOptions::default()).unwrap_err();
2235        assert!(matches!(err, Error::EmptyScenarioBatch), "got {err:?}");
2236    }
2237
2238    #[test]
2239    fn shape_mismatch_across_snapshots_errors() {
2240        let big = case14();
2241        let small = Network::in_memory(
2242            "small",
2243            100.0,
2244            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2245            vec![branch(1, 2, 0.01, 0.1)],
2246        );
2247        let snaps = [
2248            GridfmSnapshot {
2249                net: &big,
2250                scenario: 0,
2251            },
2252            GridfmSnapshot {
2253                net: &small,
2254                scenario: 1,
2255            },
2256        ];
2257        let err = gridfm_record_batches_batch(&snaps, &GridfmOptions::default()).unwrap_err();
2258        assert!(
2259            matches!(
2260                err,
2261                Error::ScenarioShapeMismatch {
2262                    index: 1,
2263                    reason: ScenarioMismatch::Counts { .. }
2264                }
2265            ),
2266            "got {err:?}"
2267        );
2268    }
2269
2270    #[test]
2271    fn bus_order_mismatch_is_reported_distinctly() {
2272        // Same counts and the same bus-id set, but a different ordering: the dense
2273        // bus index would mean different buses across snapshots, so the batch is
2274        // rejected with the BusOrder reason (not the same-tuple Counts message).
2275        let base = case14();
2276        let mut reordered = base.clone();
2277        reordered.buses.swap(0, 1);
2278        let snaps = [
2279            GridfmSnapshot {
2280                net: &base,
2281                scenario: 0,
2282            },
2283            GridfmSnapshot {
2284                net: &reordered,
2285                scenario: 1,
2286            },
2287        ];
2288        let err = gridfm_record_batches_batch(&snaps, &GridfmOptions::default()).unwrap_err();
2289        assert!(
2290            matches!(
2291                err,
2292                Error::ScenarioShapeMismatch {
2293                    index: 1,
2294                    reason: ScenarioMismatch::BusOrder
2295                }
2296            ),
2297            "got {err:?}"
2298        );
2299    }
2300
2301    #[test]
2302    fn manifest_counts_sum_over_the_batch() {
2303        // Two snapshots on the same element set, but only the second has a zeroed
2304        // branch impedance. The manifest's dropped count describes the whole
2305        // dataset (a total of 1), not just the first snapshot — branch status and
2306        // impedance may legitimately differ per scenario.
2307        let base = case14();
2308        let mut perturbed = base.clone();
2309        perturbed.branches[0].r = 0.0;
2310        perturbed.branches[0].x = 0.0;
2311        let snaps = [
2312            GridfmSnapshot {
2313                net: &base,
2314                scenario: 0,
2315            },
2316            GridfmSnapshot {
2317                net: &perturbed,
2318                scenario: 1,
2319            },
2320        ];
2321        let dir = tempfile::tempdir().unwrap();
2322        let out = write_gridfm_batch(&snaps, dir.path(), &GridfmOptions::default()).unwrap();
2323        assert_eq!(out.dropped_zero_impedance, 1);
2324        let meta: serde_json::Value = serde_json::from_str(
2325            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2326        )
2327        .unwrap();
2328        assert_eq!(meta["dropped_zero_impedance"], 1);
2329    }
2330
2331    #[test]
2332    fn y_bus_table_is_absent_when_disabled() {
2333        let net = case14();
2334        let opts = GridfmOptions {
2335            include_y_bus: false,
2336            ..Default::default()
2337        };
2338        let tables = gridfm_record_batches(&net, 0, &opts).unwrap();
2339        assert!(tables.y_bus.is_none(), "y_bus should not be built");
2340
2341        let dir = tempfile::tempdir().unwrap();
2342        let out = write_gridfm_dataset(&net, 0, dir.path(), &opts).unwrap();
2343        assert!(
2344            !out.dir.join("y_bus_data.parquet").exists(),
2345            "y_bus_data.parquet should not be written"
2346        );
2347    }
2348
2349    #[test]
2350    fn numbered_snapshots_stamps_base_plus_k_and_checks_overflow() {
2351        // The shared builder both bindings use: the k-th network is scenario
2352        // `base + k`, in order.
2353        let net = case14();
2354        let snaps = numbered_snapshots(&[&net, &net, &net], 5).unwrap();
2355        assert_eq!(snaps.len(), 3);
2356        assert_eq!(snaps[0].scenario, 5);
2357        assert_eq!(snaps[1].scenario, 6);
2358        assert_eq!(snaps[2].scenario, 7);
2359
2360        // Overflow is checked (not wrapped to a negative id, not a panic) and names
2361        // the offending index.
2362        let err = numbered_snapshots(&[&net, &net], i64::MAX).unwrap_err();
2363        assert!(
2364            matches!(err, Error::ScenarioIdOverflow { index: 1, .. }),
2365            "got {err:?}"
2366        );
2367    }
2368
2369    #[test]
2370    fn out_of_service_generator_is_listed_but_excluded_from_bus_aggregate() {
2371        // Two paths react to `g.in_service`: gen_data emits an `in_service` column
2372        // for every generator (keeping its setpoint), while bus `Pg`/`Qg` aggregate
2373        // only in-service generation (`view.in_service_gens()`). Exercise the
2374        // `false` case on both.
2375        let mut net = Network::in_memory(
2376            "genoutage",
2377            100.0,
2378            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2379            vec![branch(1, 2, 0.01, 0.1)],
2380        );
2381        let mut g_on = gen_at(1, gencost(2, 3, vec![0.0, 1.0, 0.0]));
2382        g_on.pg = 50.0;
2383        let mut g_off = gen_at(2, gencost(2, 3, vec![0.0, 1.0, 0.0]));
2384        g_off.pg = 30.0;
2385        g_off.in_service = false;
2386        net.generators.push(g_on);
2387        net.generators.push(g_off);
2388
2389        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2390
2391        // gen_data lists both gens in source order, flags the out-of-service one,
2392        // and keeps its setpoint.
2393        let g = &tables.generator;
2394        assert_eq!(g.num_rows(), 2);
2395        let in_service = col_i64(g, "in_service");
2396        assert_eq!(in_service.value(0), 1, "in-service gen flagged 1");
2397        assert_eq!(in_service.value(1), 0, "out-of-service gen flagged 0");
2398        assert!(
2399            (col_f64(g, "p_mw").value(1) - 30.0).abs() < 1e-12,
2400            "gen_data keeps the out-of-service setpoint"
2401        );
2402
2403        // bus Pg aggregates only in-service generation: bus 1 (dense 0) gets 50,
2404        // bus 2 (dense 1) excludes the out-of-service gen's 30.
2405        let pg = col_f64(&tables.bus, "Pg");
2406        assert!(
2407            (pg.value(0) - 50.0).abs() < 1e-12,
2408            "in-service gen folded into bus Pg"
2409        );
2410        assert!(
2411            pg.value(1) == 0.0,
2412            "out-of-service gen excluded from bus Pg, got {}",
2413            pg.value(1)
2414        );
2415    }
2416
2417    #[test]
2418    fn out_of_service_branch_zeros_flows_but_keeps_admittance() {
2419        // An out-of-service branch keeps its physical Y** admittances but carries
2420        // zero flows and `br_status = 0` — the path datakit's topology variants
2421        // exercise. Use non-flat voltages so an *in-service* branch carries real
2422        // flow, which makes the zero on the tripped branch meaningful (not just an
2423        // artifact of a flat start).
2424        let mut net = Network::in_memory(
2425            "outage",
2426            100.0,
2427            vec![
2428                bus(1, BusType::Ref),
2429                bus(2, BusType::Pq),
2430                bus(3, BusType::Pq),
2431            ],
2432            vec![branch(1, 2, 0.01, 0.1), branch(2, 3, 0.02, 0.2)],
2433        );
2434        net.buses[1].va = -3.0;
2435        net.buses[2].va = -6.0;
2436        net.branches[0].in_service = false; // trip branch 0
2437
2438        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2439        let br = &tables.branch;
2440        let status = col_i64(br, "br_status");
2441        assert_eq!(status.value(0), 0, "tripped branch reports br_status 0");
2442        assert_eq!(status.value(1), 1, "in-service branch reports br_status 1");
2443
2444        for col in ["pf", "qf", "pt", "qt"] {
2445            let v = col_f64(br, col).value(0);
2446            assert!(
2447                v == 0.0,
2448                "{col} must be zero on the out-of-service branch, got {v}"
2449            );
2450        }
2451        // The in-service branch really carries flow at these voltages — guards
2452        // against a flat-start false pass that would zero every branch anyway.
2453        assert!(
2454            col_f64(br, "pf").value(1).abs() > 1e-6,
2455            "in-service branch should carry nonzero flow"
2456        );
2457        // Admittances are retained for the tripped branch (unlike a zero-impedance
2458        // branch, which zeroes them).
2459        assert!(
2460            col_f64(br, "Yff_i").value(0).abs() > 0.0,
2461            "out-of-service branch keeps its physical Y** admittances"
2462        );
2463    }
2464
2465    fn bus(id: usize, kind: BusType) -> Bus {
2466        Bus {
2467            id: BusId(id),
2468            kind,
2469            vm: 1.0,
2470            va: 0.0,
2471            base_kv: 1.0,
2472            vmax: 1.1,
2473            vmin: 0.9,
2474            area: 1,
2475            zone: 1,
2476            name: None,
2477            extras: Extras::new(),
2478        }
2479    }
2480
2481    fn branch(from: usize, to: usize, r: f64, x: f64) -> Branch {
2482        Branch {
2483            from: BusId(from),
2484            to: BusId(to),
2485            r,
2486            x,
2487            b: 0.0,
2488            rate_a: 0.0,
2489            rate_b: 0.0,
2490            rate_c: 0.0,
2491            tap: 0.0,
2492            shift: 0.0,
2493            in_service: true,
2494            angmin: -360.0,
2495            angmax: 360.0,
2496            extras: Extras::new(),
2497        }
2498    }
2499
2500    fn gencost(model: u8, ncost: usize, coeffs: Vec<f64>) -> GenCost {
2501        GenCost {
2502            model,
2503            startup: 0.0,
2504            shutdown: 0.0,
2505            ncost,
2506            coeffs,
2507        }
2508    }
2509
2510    fn gen_at(bus: usize, cost: GenCost) -> Generator {
2511        Generator {
2512            bus: BusId(bus),
2513            pg: 0.0,
2514            qg: 0.0,
2515            pmax: 100.0,
2516            pmin: 0.0,
2517            qmax: 50.0,
2518            qmin: -50.0,
2519            vg: 1.0,
2520            mbase: 100.0,
2521            in_service: true,
2522            cost: Some(cost),
2523            caps: [None; 11],
2524        }
2525    }
2526
2527    #[test]
2528    fn degenerate_cost_gen_zeros_columns_and_is_counted() {
2529        // Counterpart to the zero-impedance test: a piecewise (model 1) cost row
2530        // gets zeroed cp* columns and is counted; a polynomial row is kept.
2531        let mut net = Network::in_memory(
2532            "degen",
2533            100.0,
2534            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2535            vec![branch(1, 2, 0.01, 0.1)],
2536        );
2537        net.generators
2538            .push(gen_at(1, gencost(1, 2, vec![0.0, 0.0, 1.0, 1.0]))); // piecewise
2539        net.generators
2540            .push(gen_at(2, gencost(2, 3, vec![0.01, 5.0, 0.0]))); // polynomial
2541
2542        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2543        let g = &tables.generator;
2544        let (cp0, cp1, cp2) = (
2545            col_f64(g, "cp0_eur"),
2546            col_f64(g, "cp1_eur_per_mw"),
2547            col_f64(g, "cp2_eur_per_mw2"),
2548        );
2549        assert_eq!((cp0.value(0), cp1.value(0), cp2.value(0)), (0.0, 0.0, 0.0));
2550        assert_eq!((cp0.value(1), cp1.value(1), cp2.value(1)), (0.0, 5.0, 0.01));
2551
2552        let dir = tempfile::tempdir().unwrap();
2553        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2554        assert_eq!(out.degenerate_cost_gens, 1);
2555        let meta: serde_json::Value = serde_json::from_str(
2556            &std::fs::read_to_string(out.dir.join("gridfm_meta.json")).unwrap(),
2557        )
2558        .unwrap();
2559        assert_eq!(meta["degenerate_cost_gens"], 1);
2560    }
2561
2562    #[test]
2563    fn scenario_id_and_tap_toggle_take_effect() {
2564        let net = case14();
2565
2566        // The scenario id (an explicit argument now) reaches both id columns.
2567        let bus = gridfm_record_batches(&net, 7, &GridfmOptions::default())
2568            .unwrap()
2569            .bus;
2570        assert_eq!(col_i64(&bus, "scenario").value(0), 7);
2571        assert_eq!(col_i64(&bus, "load_scenario_idx").value(0), 7);
2572
2573        // Turning taps off changes a transformer's admittance columns.
2574        let on = gridfm_record_batches(&net, 0, &GridfmOptions::default())
2575            .unwrap()
2576            .branch;
2577        let off = gridfm_record_batches(
2578            &net,
2579            0,
2580            &GridfmOptions {
2581                include_taps: false,
2582                ..Default::default()
2583            },
2584        )
2585        .unwrap()
2586        .branch;
2587        let tap = col_f64(&on, "tap");
2588        let xfmr = (0..on.num_rows())
2589            .find(|&i| (tap.value(i) - 1.0).abs() > 1e-9)
2590            .expect("case14 has off-nominal transformers");
2591        // case14 transformers are lossless (r = 0), so compare the susceptance
2592        // (imaginary) part, which scales with 1/tap².
2593        assert!(
2594            (col_f64(&on, "Yff_i").value(xfmr) - col_f64(&off, "Yff_i").value(xfmr)).abs() > 1e-9,
2595            "taps off should change the transformer's Yff"
2596        );
2597    }
2598
2599    // --- reader (issue #60) ------------------------------------------------
2600
2601    /// The power-flow-complete fingerprint the read contract preserves: element
2602    /// counts, nodal load / generation totals, branch impedance sums, base_mva,
2603    /// and the reference-bus count. Bus ids, per-element granularity, costs, and
2604    /// HVDC/storage are excluded by the contract.
2605    #[allow(clippy::type_complexity)]
2606    fn fingerprint(
2607        net: &Network,
2608    ) -> (
2609        usize,
2610        usize,
2611        usize,
2612        usize,
2613        f64,
2614        f64,
2615        f64,
2616        f64,
2617        f64,
2618        f64,
2619        f64,
2620    ) {
2621        (
2622            net.buses.len(),
2623            net.branches.len(),
2624            net.generators.len(),
2625            net.buses.iter().filter(|b| b.kind == BusType::Ref).count(),
2626            net.loads.iter().map(|l| l.p).sum(),
2627            net.loads.iter().map(|l| l.q).sum(),
2628            net.generators.iter().map(|g| g.pg).sum(),
2629            net.branches.iter().map(|b| b.r).sum(),
2630            net.branches.iter().map(|b| b.x).sum(),
2631            net.branches.iter().map(|b| b.b).sum(),
2632            net.base_mva,
2633        )
2634    }
2635
2636    fn assert_fingerprint_close(got: &Network, want: &Network) {
2637        let (g, w) = (fingerprint(got), fingerprint(want));
2638        assert_eq!(
2639            (g.0, g.1, g.2, g.3),
2640            (w.0, w.1, w.2, w.3),
2641            "bus/branch/gen/ref counts differ"
2642        );
2643        for (a, b, label) in [
2644            (g.4, w.4, "load P"),
2645            (g.5, w.5, "load Q"),
2646            (g.6, w.6, "gen P"),
2647            (g.7, w.7, "sum r"),
2648            (g.8, w.8, "sum x"),
2649            (g.9, w.9, "sum b"),
2650            (g.10, w.10, "base_mva"),
2651        ] {
2652            assert!((a - b).abs() < 1e-9, "{label} differs: {a} vs {b}");
2653        }
2654    }
2655
2656    #[test]
2657    fn read_round_trips_power_flow_fingerprint() {
2658        let net = case14();
2659        let dir = tempfile::tempdir().unwrap();
2660        write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2661
2662        let read = read_gridfm_dataset(dir.path().join("case14").join("raw"), 0).unwrap();
2663        assert_eq!(read.scenario, 0);
2664        assert_eq!(read.network.source_format, SourceFormat::Gridfm);
2665        assert_eq!(read.network.name, "case14");
2666        assert!(read.network.source.is_none());
2667        assert_fingerprint_close(&read.network, &net);
2668        // The reconstruction is structurally valid (validate() already ran inside).
2669        read.network.validate().unwrap();
2670    }
2671
2672    #[test]
2673    fn read_gridfm_network_pure_path_matches_disk() {
2674        // The in-memory inverse of gridfm_record_batches reproduces the same
2675        // fingerprint with no disk I/O.
2676        let net = case14();
2677        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2678        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2679        assert_fingerprint_close(&read.network, &net);
2680    }
2681
2682    #[test]
2683    fn read_recovers_shunt_at_base_mva() {
2684        // case14 has a single bus shunt (Bs = 19 at bus 9). The writer divides by
2685        // base_mva; the reader must multiply it back.
2686        let net = case14();
2687        let want_b: f64 = net.shunts.iter().map(|s| s.b).sum();
2688        assert!(want_b.abs() > 1.0, "fixture should have a real shunt");
2689
2690        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2691        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2692        let got_b: f64 = read.network.shunts.iter().map(|s| s.b).sum();
2693        assert!(
2694            (got_b - want_b).abs() < 1e-9,
2695            "shunt b not recovered at base_mva: {got_b} vs {want_b}"
2696        );
2697    }
2698
2699    #[test]
2700    fn read_scenarios_yields_distinct_networks() {
2701        // A 2-scenario batch (base + load×1.1): each scenario reads back to its own
2702        // Network, and gridfm_base_case picks scenario 0.
2703        let base = case14();
2704        let up = scaled(&base, 1.1);
2705        let snaps = [
2706            GridfmSnapshot {
2707                net: &base,
2708                scenario: 0,
2709            },
2710            GridfmSnapshot {
2711                net: &up,
2712                scenario: 1,
2713            },
2714        ];
2715        let dir = tempfile::tempdir().unwrap();
2716        let out = write_gridfm_batch(&snaps, dir.path(), &GridfmOptions::default()).unwrap();
2717
2718        let reads = read_gridfm_scenarios(&out.dir).unwrap();
2719        assert_eq!(reads.len(), 2);
2720        assert_eq!((reads[0].scenario, reads[1].scenario), (0, 1));
2721
2722        let load0: f64 = reads[0].network.loads.iter().map(|l| l.p).sum();
2723        let load1: f64 = reads[1].network.loads.iter().map(|l| l.p).sum();
2724        assert!(load0 > 0.0);
2725        assert!(
2726            (load1 - 1.1 * load0).abs() < 1e-6,
2727            "scenario 1 load should be 1.1× scenario 0: {load1} vs {load0}"
2728        );
2729
2730        let base_case = gridfm_base_case(&out.dir).unwrap();
2731        assert_fingerprint_close(&base_case.network, &reads[0].network);
2732    }
2733
2734    #[test]
2735    fn read_resolves_lenient_directory_layouts() {
2736        // resolve_raw_dir accepts the leaf raw/ dir, the <case>/ dir, and the
2737        // parent out/ dir.
2738        let net = case14();
2739        let dir = tempfile::tempdir().unwrap();
2740        write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2741        let out = dir.path(); // parent
2742        let case_dir = out.join("case14");
2743        let raw_dir = case_dir.join("raw");
2744        for d in [raw_dir.clone(), case_dir, out.to_path_buf()] {
2745            let read = read_gridfm_dataset(&d, 0)
2746                .unwrap_or_else(|e| panic!("failed to resolve {}: {e}", d.display()));
2747            assert_eq!(read.network.buses.len(), net.buses.len());
2748        }
2749    }
2750
2751    #[test]
2752    fn read_missing_scenario_errors() {
2753        let net = case14();
2754        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2755        let err = read_gridfm_network(&tables, 99, net.base_mva, &net.name).unwrap_err();
2756        assert!(
2757            matches!(
2758                err,
2759                Error::FormatRead {
2760                    format: "gridfm",
2761                    ..
2762                }
2763            ),
2764            "got {err:?}"
2765        );
2766    }
2767
2768    #[test]
2769    fn read_no_dataset_errors() {
2770        // An empty (but existing) directory: read_dir succeeds, finds nothing.
2771        let dir = tempfile::tempdir().unwrap();
2772        let err = read_gridfm_dataset(dir.path(), 0).unwrap_err();
2773        assert!(
2774            matches!(
2775                err,
2776                Error::FormatRead {
2777                    format: "gridfm",
2778                    ..
2779                }
2780            ),
2781            "got {err:?}"
2782        );
2783        // A non-existent directory: read_dir's IO error is surfaced, not masked.
2784        let missing = dir.path().join("does-not-exist");
2785        let err = read_gridfm_dataset(&missing, 0).unwrap_err();
2786        assert!(
2787            matches!(
2788                err,
2789                Error::FormatRead {
2790                    format: "gridfm",
2791                    ..
2792                }
2793            ),
2794            "got {err:?}"
2795        );
2796    }
2797
2798    #[test]
2799    fn read_defaults_unusable_base_mva_to_100() {
2800        // A manifest base_mva of 0 (or negative/NaN) is unusable — shunt recovery
2801        // scales by it — so the reader defaults to 100 and warns instead of
2802        // silently producing a network with zeroed shunts.
2803        let net = case14();
2804        let dir = tempfile::tempdir().unwrap();
2805        let out = write_gridfm_dataset(&net, 0, dir.path(), &GridfmOptions::default()).unwrap();
2806        let meta_path = out.dir.join("gridfm_meta.json");
2807        let mut meta: serde_json::Value =
2808            serde_json::from_str(&std::fs::read_to_string(&meta_path).unwrap()).unwrap();
2809        meta["base_mva"] = serde_json::json!(0.0);
2810        std::fs::write(&meta_path, serde_json::to_string(&meta).unwrap()).unwrap();
2811
2812        let read = read_gridfm_dataset(&out.dir, 0).unwrap();
2813        assert!(
2814            (read.network.base_mva - 100.0).abs() < 1e-9,
2815            "base_mva should default to 100, got {}",
2816            read.network.base_mva
2817        );
2818        assert!(
2819            read.warnings.iter().any(|w| w.contains("base_mva")),
2820            "expected a base_mva warning, got {:?}",
2821            read.warnings
2822        );
2823    }
2824
2825    #[test]
2826    fn read_surfaces_fidelity_warnings() {
2827        let net = case14();
2828        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2829        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2830        assert!(!read.warnings.is_empty());
2831        assert!(
2832            read.warnings
2833                .iter()
2834                .any(|w| w.contains("synthesized bus ids")),
2835            "expected the bus-id synthesis warning, got {:?}",
2836            read.warnings
2837        );
2838        // case14 has loads and a shunt, so those folding warnings appear too.
2839        assert!(read.warnings.iter().any(|w| w.contains("nodal load")));
2840        assert!(read.warnings.iter().any(|w| w.contains("nodal shunts")));
2841    }
2842
2843    #[test]
2844    fn read_recovers_gen_vg_from_bus_vm() {
2845        // gridfm has no gen vg column; vg is recovered from the gen's bus Vm.
2846        // case14's slack bus 1 sits at Vm = 1.06, so its generator reads vg ≈ 1.06
2847        // (not the old hard-coded 1.0).
2848        let net = case14();
2849        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2850        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2851        for g in &read.network.generators {
2852            let bus = read
2853                .network
2854                .buses
2855                .iter()
2856                .find(|b| b.id == g.bus)
2857                .expect("gen references a known bus");
2858            assert!(
2859                (g.vg - bus.vm).abs() < 1e-12,
2860                "vg should equal the bus Vm: {} vs {}",
2861                g.vg,
2862                bus.vm
2863            );
2864        }
2865        assert!(
2866            read.network
2867                .generators
2868                .iter()
2869                .any(|g| (g.vg - 1.0).abs() > 1e-3),
2870            "expected a generator with vg != 1.0 (case14's slack is at 1.06)"
2871        );
2872    }
2873
2874    #[test]
2875    fn read_maps_unit_tap_lines_back_to_zero() {
2876        // The writer stores effective tap (a line's raw 0 becomes 1.0). The reader
2877        // must map unit tap + no shift back to raw tap 0 so lines stay lines;
2878        // otherwise every line reads as a transformer and a read→write to PSS/E /
2879        // PowerWorld misclassifies them. case14 has both lines and off-nominal
2880        // transformers, so the line/transformer split must survive the round trip.
2881        let net = case14();
2882        let n_lines = net.branches.iter().filter(|b| !b.is_transformer()).count();
2883        let n_xfmr = net.branches.iter().filter(|b| b.is_transformer()).count();
2884        assert!(
2885            n_lines > 0 && n_xfmr > 0,
2886            "fixture needs both lines and transformers"
2887        );
2888
2889        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2890        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2891        let read_lines = read
2892            .network
2893            .branches
2894            .iter()
2895            .filter(|b| !b.is_transformer())
2896            .count();
2897        let read_xfmr = read
2898            .network
2899            .branches
2900            .iter()
2901            .filter(|b| b.is_transformer())
2902            .count();
2903        assert_eq!(
2904            read_lines, n_lines,
2905            "lines must read back as lines (raw tap 0)"
2906        );
2907        assert_eq!(
2908            read_xfmr, n_xfmr,
2909            "transformers must keep their off-nominal ratio"
2910        );
2911        assert!(
2912            read.warnings.iter().any(|w| w.contains("read as lines")),
2913            "expected the unit-tap warning, got {:?}",
2914            read.warnings
2915        );
2916    }
2917
2918    #[test]
2919    fn read_allows_a_case_with_no_generators() {
2920        // gen_data may be legitimately empty (a power-flow case with no mpc.gen);
2921        // the scenario guard must not reject it — only a *partial* table (rows for
2922        // other scenarios but not this one) is an error.
2923        let net = Network::in_memory(
2924            "nogen",
2925            100.0,
2926            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2927            vec![branch(1, 2, 0.01, 0.1)],
2928        );
2929        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2930        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2931        assert!(read.network.generators.is_empty());
2932        assert_eq!(read.network.branches.len(), 1);
2933    }
2934
2935    #[test]
2936    fn read_all_zero_cost_reads_as_none_with_ambiguity_warning() {
2937        // A genuine zero polynomial cost writes (0,0,0), indistinguishable from a
2938        // no-cost generator or a zeroed unrepresentable cost; the reader reads None
2939        // and the warning describes the ambiguity (not a false "piecewise/cubic").
2940        let mut net = Network::in_memory(
2941            "zerocost",
2942            100.0,
2943            vec![bus(1, BusType::Ref), bus(2, BusType::Pq)],
2944            vec![branch(1, 2, 0.01, 0.1)],
2945        );
2946        net.generators
2947            .push(gen_at(1, gencost(2, 3, vec![0.0, 0.0, 0.0])));
2948        let tables = gridfm_record_batches(&net, 0, &GridfmOptions::default()).unwrap();
2949        let read = read_gridfm_network(&tables, 0, net.base_mva, &net.name).unwrap();
2950        assert!(
2951            read.network.generators[0].cost.is_none(),
2952            "all-zero cost should read back as None"
2953        );
2954        assert!(
2955            read.warnings
2956                .iter()
2957                .any(|w| w.contains("read with no cost")),
2958            "expected the no-cost ambiguity warning, got {:?}",
2959            read.warnings
2960        );
2961    }
2962
2963    #[test]
2964    fn require_scenario_block_flags_partial_tables() {
2965        // Empty table → ok (a legitimately element-less case). Present → ok. Absent
2966        // from a non-empty table → error (the partial/corrupt dataset Copilot flagged).
2967        assert!(require_scenario_block(&[], 0, &[], "gen_data").is_ok());
2968        assert!(require_scenario_block(&[0, 0, 1], 0, &[0, 1], "gen_data").is_ok());
2969        let err = require_scenario_block(&[0, 0], 1, &[], "branch_data").unwrap_err();
2970        assert!(
2971            matches!(
2972                err,
2973                Error::FormatRead {
2974                    format: "gridfm",
2975                    ..
2976                }
2977            ),
2978            "got {err:?}"
2979        );
2980    }
2981}