diff --git a/docs/reference/api/function_data.md b/docs/reference/api/function_data.md new file mode 100644 index 0000000..bb138e5 --- /dev/null +++ b/docs/reference/api/function_data.md @@ -0,0 +1,9 @@ +```{eval-rst} +.. _function-data-api: +``` +# Function data + +```{eval-rst} +.. automodule:: infrasys.function_data + :members: +``` diff --git a/docs/reference/api/index.md b/docs/reference/api/index.md index 760627b..e6027c6 100644 --- a/docs/reference/api/index.md +++ b/docs/reference/api/index.md @@ -13,4 +13,5 @@ time_series location quantities + function_data ``` diff --git a/pyproject.toml b/pyproject.toml index 0223958..90bc50e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,12 +83,13 @@ target-version = "py311" [tool.ruff.lint] # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. select = [ - "C901", # McCabe complexity - "E4", # Subset of pycodestyle (E) + "C901", # McCabe complexity + "E4", # Subset of pycodestyle (E) "E7", "E9", - "F", # Pyflakes - "W", # pycodestyle warnings + "EM", # string formatting in an exception message + "F", # Pyflakes + "W", # pycodestyle warnings ] ignore = [] diff --git a/src/infrasys/base_quantity.py b/src/infrasys/base_quantity.py index edb33db..6908c1e 100644 --- a/src/infrasys/base_quantity.py +++ b/src/infrasys/base_quantity.py @@ -24,7 +24,8 @@ def __new__(cls: Type["BaseQuantity"], value, units=None): def __init_subclass__(cls, **kwargs): if not cls.__base_unit__: - raise TypeError("__base_unit__ should be defined") + msg = "__base_unit__ should be defined" + raise TypeError(msg) super().__init_subclass__(**kwargs) # Required for pydantic validation diff --git a/src/infrasys/component_manager.py b/src/infrasys/component_manager.py index df9a9d0..88e7c2a 100644 --- a/src/infrasys/component_manager.py +++ b/src/infrasys/component_manager.py @@ -256,7 +256,8 @@ def deepcopy(self, component: Component) -> Component: def change_uuid(self, component: Component) -> None: """Change the component UUID.""" - raise NotImplementedError("change_component_uuid") + msg = "change_component_uuid" + raise NotImplementedError(msg) def update( self, diff --git a/src/infrasys/cost_curves.py b/src/infrasys/cost_curves.py new file mode 100644 index 0000000..4f4b7f9 --- /dev/null +++ b/src/infrasys/cost_curves.py @@ -0,0 +1,59 @@ +from typing_extensions import Annotated +from infrasys.component import Component +from pydantic import Field +from infrasys.value_curves import InputOutputCurve, IncrementalCurve, AverageRateCurve +from infrasys.function_data import LinearFunctionData + + +class ProductionVariableCostCurve(Component): + name: Annotated[str, Field(frozen=True)] = "" + + +class CostCurve(ProductionVariableCostCurve): + """Direct representation of the variable operation cost of a power plant in currency. + + Composed of a Value Curve that may represent input-output, incremental, or average rate + data. The default units for the x-axis are MW and can be specified with + `power_units`. + """ + + value_curve: Annotated[ + InputOutputCurve | IncrementalCurve | AverageRateCurve, + Field( + description="The underlying `ValueCurve` representation of this `ProductionVariableCostCurve`" + ), + ] + vom_units: Annotated[ + InputOutputCurve, + Field(description="(default: natural units (MW)) The units for the x-axis of the curve"), + ] = InputOutputCurve( + function_data=LinearFunctionData(proportional_term=0.0, constant_term=0.0) + ) + + +class FuelCurve(ProductionVariableCostCurve): + """Representation of the variable operation cost of a power plant in terms of fuel. + + Fuel units (MBTU, liters, m^3, etc.) coupled with a conversion factor between fuel and currency. + Composed of a Value Curve that may represent input-output, incremental, or average rate data. + The default units for the x-axis are MW and can be specified with `power_units`. + """ + + value_curve: Annotated[ + InputOutputCurve | IncrementalCurve | AverageRateCurve, + Field( + description="The underlying `ValueCurve` representation of this `ProductionVariableCostCurve`" + ), + ] + vom_units: Annotated[ + InputOutputCurve, + Field(description="(default: natural units (MW)) The units for the x-axis of the curve"), + ] = InputOutputCurve( + function_data=LinearFunctionData(proportional_term=0.0, constant_term=0.0) + ) + fuel_cost: Annotated[ + float, + Field( + description="Either a fixed value for fuel cost or the key to a fuel cost time series" + ), + ] diff --git a/src/infrasys/function_data.py b/src/infrasys/function_data.py index 551f8e7..182f19b 100644 --- a/src/infrasys/function_data.py +++ b/src/infrasys/function_data.py @@ -15,49 +15,57 @@ class XYCoords(NamedTuple): y: float -class LinearFunctionData(Component): +class FunctionData(Component): + """BaseClass of FunctionData""" + + name: Annotated[str, Field(frozen=True)] = "" + + +class LinearFunctionData(FunctionData): """Data representation for linear cost function. - Class to represent the underlying data of linear functions. Principally used for - the representation of cost functions `f(x) = proportional_term*x + constant_term`. + Used to represent linear cost functions of the form + + .. math:: f(x) = mx + c, + + where :math:`m` is the proportional term and :math:`c` is the constant term. """ - name: Annotated[str, Field(frozen=True)] = "" proportional_term: Annotated[ - float, Field(description="the proportional term in the represented function") + float, Field(description="the proportional term in the represented function.") ] constant_term: Annotated[ - float, Field(description="the constant term in the represented function") + float, Field(description="the constant term in the represented function.") ] -class QuadraticFunctionData(Component): - """Data representation for quadratic cost functions. +class QuadraticFunctionData(FunctionData): + """Data representation for quadratic cost function. + + Used to represent quadratic of cost functions of the form + + .. math:: f(x) = ax^2 + bx + c, - Class to represent the underlying data of quadratic functions. Principally used for the - representation of cost functions - `f(x) = quadratic_term*x^2 + proportional_term*x + constant_term`. + where :math:`a` is the quadratic term, :math:`b` is the proportional term and :math:`c` is the + constant term. """ - name: Annotated[str, Field(frozen=True)] = "" quadratic_term: Annotated[ - float, Field(description="the quadratic term in the represented function") + float, Field(description="the quadratic term in the represented function.") ] proportional_term: Annotated[ - float, Field(description="the proportional term in the represented function") + float, Field(description="the proportional term in the represented function.") ] constant_term: Annotated[ - float, Field(description="the constant term in the represented function") + float, Field(description="the constant term in the represented function.") ] def validate_piecewise_linear_x(points: List[XYCoords]) -> List[XYCoords]: """Validates the x data for PiecewiseLinearData class - Function used to validate given x data for the PiecewiseLinearData class. - X data is checked to ensure there is at least two values of x, - which is the minimum required to generate a cost curve, and is - given in ascending order (e.g. [1, 2, 3], not [1, 3, 2]). + X data is checked to ensure there is at least two values of x, which is the minimum required to + generate a cost curve, and is given in ascending order (e.g. [1, 2, 3], not [1, 3, 2]). Parameters ---------- @@ -65,7 +73,7 @@ def validate_piecewise_linear_x(points: List[XYCoords]) -> List[XYCoords]: List of named tuples of (x,y) coordinates for cost function Returns - ---------- + ------- points : List[XYCoords] List of (x,y) data for cost function after successful validation. """ @@ -73,12 +81,14 @@ def validate_piecewise_linear_x(points: List[XYCoords]) -> List[XYCoords]: x_coords = [p.x for p in points] if len(x_coords) < 2: - raise ValueError("Must specify at least two x-coordinates") + msg = "Must specify at least two x-coordinates." + raise ValueError(msg) if not ( x_coords == sorted(x_coords) or (np.isnan(x_coords[0]) and x_coords[1:] == sorted(x_coords[1:])) ): - raise ValueError(f"Piecewise x-coordinates must be ascending, got {x_coords}") + msg = f"Piecewise x-coordinates must be ascending, got {x_coords}." + raise ValueError(msg) return points @@ -86,10 +96,8 @@ def validate_piecewise_linear_x(points: List[XYCoords]) -> List[XYCoords]: def validate_piecewise_step_x(x_coords: List[float]) -> List[float]: """Validates the x data for PiecewiseStepData class - Function used to validate given x data for the PiecewiseStepData class. - X data is checked to ensure there is at least two values of x, - which is the minimum required to generate a cost curve, and is - given in ascending order (e.g. [1, 2, 3], not [1, 3, 2]). + X data is checked to ensure there is at least two values of x, which is the minimum required to + generate a cost curve, and is given in ascending order (e.g. [1, 2, 3], not [1, 3, 2]). Parameters ---------- @@ -97,61 +105,64 @@ def validate_piecewise_step_x(x_coords: List[float]) -> List[float]: List of x data for cost function. Returns - ---------- + ------- x_coords : List[float] List of x data for cost function after successful validation. """ if len(x_coords) < 2: - raise ValueError("Must specify at least two x-coordinates") + msg = "Must specify at least two x-coordinates." + raise ValueError(msg) if not ( x_coords == sorted(x_coords) or (np.isnan(x_coords[0]) and x_coords[1:] == sorted(x_coords[1:])) ): - raise ValueError(f"Piecewise x-coordinates must be ascending, got {x_coords}") + msg = f"Piecewise x-coordinates must be ascending, got {x_coords}." + raise ValueError(msg) return x_coords -class PiecewiseLinearData(Component): +class PiecewiseLinearData(FunctionData): """Data representation for piecewise linear cost function. - Class to represent piecewise linear data as a series of points: two points define one - segment, three points define two segments, etc. The curve starts at the first point given, - not the origin. Principally used for the representation of cost functions where the points - store quantities (x, y), such as (MW, USD/h). + Used to represent linear data as a series of points: two points define one segment, three + points define two segments, etc. The curve starts at the first point given, not the origin. + Principally used for the representation of cost functions where the points store quantities (x, + y), such as (MW, USD/h). """ - name: Annotated[str, Field(frozen=True)] = "" points: Annotated[ List[XYCoords], AfterValidator(validate_piecewise_linear_x), - Field(description="list of (x,y) points that define the function"), + Field(description="list of (x,y) points that define the function."), ] -class PiecewiseStepData(Component): +class PiecewiseStepData(FunctionData): """Data representation for piecewise step cost function. - Class to represent a step function as a series of endpoint x-coordinates and segment + Used to represent a step function as a series of endpoint x-coordinates and segment y-coordinates: two x-coordinates and one y-coordinate defines a single segment, three - x-coordinates and two y-coordinates define two segments, etc. This can be useful to - represent the derivative of a `PiecewiseLinearData`, where the y-coordinates of this - step function represent the slopes of that piecewise linear function. - Principally used for the representation of cost functions where the points store - quantities (x, dy/dx), such as (MW, USD/MWh). + x-coordinates and two y-coordinates define two segments, etc. + + This can be useful to represent the derivative of a :class:`PiecewiseLinearData`, where the + y-coordinates of this step function represent the slopes of that piecewise linear function. + Principally used for the representation of cost functions where the points store quantities (x, + :math:`dy/dx`), such as (MW, USD/MWh). """ - name: Annotated[str, Field(frozen=True)] = "" x_coords: Annotated[ List[float], - Field(description="the x-coordinates of the endpoints of the segments"), + Field(description="the x-coordinates of the endpoints of the segments."), ] y_coords: Annotated[ List[float], Field( - description="the y-coordinates of the segments: `y_coords[1]` is the y-value \ - between `x_coords[0]` and `x_coords[1]`, etc. Must have one fewer elements than `x_coords`." + description=( + "The y-coordinates of the segments: `y_coords[1]` is the y-value between " + "`x_coords[0]` and `x_coords[1]`, etc. Must have one fewer elements than `x_coords`." + ) ), ] @@ -159,14 +170,34 @@ class PiecewiseStepData(Component): def validate_piecewise_xy(self): """Method to validate the x and y data for PiecewiseStepData class - Model validator used to validate given data for the PiecewiseStepData class. - Calls `validate_piecewise_step_x` to check if `x_coords` is valid, then checks if - the length of `y_coords` is exactly one less than `x_coords`, which is necessary - to define the cost functions correctly. + Model validator used to validate given data for the :class:`PiecewiseStepData`. Calls + `validate_piecewise_step_x` to check if `x_coords` is valid, then checks if the length of + `y_coords` is exactly one less than `x_coords`, which is necessary to define the cost + functions correctly. """ validate_piecewise_step_x(self.x_coords) if len(self.y_coords) != len(self.x_coords) - 1: - raise ValueError("Must specify one fewer y-coordinates than x-coordinates") + msg = "Must specify one fewer y-coordinates than x-coordinates" + raise ValueError(msg) return self + + +def get_x_lengths(x_coords: List[float]) -> List[float]: + return np.subtract(x_coords[1:], x_coords[:-1]) + + +def running_sum(data: PiecewiseStepData) -> List[XYCoords]: + points = [] + slopes = data.y_coords + x_coords = data.x_coords + x_lengths = get_x_lengths(x_coords) + running_y = 0.0 + + points.append(XYCoords(x=x_coords[0], y=running_y)) + for prev_slope, this_x, dx in zip(slopes, x_coords[1:], x_lengths): + running_y += prev_slope * dx + points.append(XYCoords(x=this_x, y=running_y)) + + return points diff --git a/src/infrasys/parquet_time_series_storage.py b/src/infrasys/parquet_time_series_storage.py index cbb43c6..8ad2515 100644 --- a/src/infrasys/parquet_time_series_storage.py +++ b/src/infrasys/parquet_time_series_storage.py @@ -19,7 +19,8 @@ def get_time_series( start_time: datetime | None = None, length: int | None = None, ) -> TimeSeriesData: - raise NotImplementedError("ParquetTimeSeriesStorage.get_time_series") + msg = "ParquetTimeSeriesStorage.get_time_series" + raise NotImplementedError(msg) def remove_time_series(self, uuid: UUID) -> None: ... diff --git a/src/infrasys/system.py b/src/infrasys/system.py index 611d743..deaa055 100644 --- a/src/infrasys/system.py +++ b/src/infrasys/system.py @@ -154,7 +154,8 @@ def to_json(self, filename: Path | str, overwrite=False, indent=None, data=None) data = system_data else: if "system" in data: - raise ISConflictingArguments("data contains the key 'system'") + msg = "data contains the key 'system'" + raise ISConflictingArguments(msg) data["system"] = system_data with open(filename, "w", encoding="utf-8") as f_out: json.dump(data, f_out, indent=indent) @@ -337,9 +338,8 @@ def save( fpath = Path(fpath) if fpath.exists() and not overwrite: - raise FileExistsError( - f"{fpath} exists already. To overwrite the folder pass `overwrite=True`" - ) + msg = f"{fpath} exists already. To overwrite the folder pass `overwrite=True`" + raise FileExistsError(msg) fpath.mkdir(parents=True, exist_ok=True) self.to_json(fpath / filename, overwrite=overwrite) @@ -991,7 +991,8 @@ def handle_data_format_upgrade( def merge_system(self, other: "System") -> None: """Merge the contents of another system into this one.""" - raise NotImplementedError("merge_system") + msg = "merge_system" + raise NotImplementedError(msg) # TODO: add delete methods that (1) don't raise if not found and (2) don't return anything? diff --git a/src/infrasys/time_series_manager.py b/src/infrasys/time_series_manager.py index 2bd428a..7013387 100644 --- a/src/infrasys/time_series_manager.py +++ b/src/infrasys/time_series_manager.py @@ -81,7 +81,8 @@ def add( """ self._handle_read_only() if not components: - raise ISOperationNotAllowed("add_time_series requires at least one component") + msg = "add_time_series requires at least one component" + raise ISOperationNotAllowed(msg) ts_type = type(time_series) if not issubclass(ts_type, TimeSeriesData): @@ -274,4 +275,5 @@ def deserialize( def _handle_read_only(self) -> None: if self._read_only: - raise ISOperationNotAllowed("Cannot modify time series in read-only mode.") + msg = "Cannot modify time series in read-only mode." + raise ISOperationNotAllowed(msg) diff --git a/src/infrasys/time_series_metadata_store.py b/src/infrasys/time_series_metadata_store.py index cb0b418..e1bb195 100644 --- a/src/infrasys/time_series_metadata_store.py +++ b/src/infrasys/time_series_metadata_store.py @@ -376,9 +376,8 @@ def remove( self._execute(cur, query) count_deleted = self._execute(cur, "SELECT changes()").fetchall()[0][0] if count_deleted != len(ids): - raise Exception( - f"Bug: Unexpected length mismatch {len(ts_uuids)=} {count_deleted=}" - ) + msg = f"Bug: Unexpected length mismatch {len(ts_uuids)=} {count_deleted=}" + raise Exception(msg) self._con.commit() return list(ts_uuids) @@ -393,7 +392,8 @@ def remove( self._con.commit() count_deleted = self._execute(cur, "SELECT changes()").fetchall()[0][0] if len(uuids) != count_deleted: - raise Exception(f"Bug: Unexpected length mismatch: {len(uuids)=} {count_deleted=}") + msg = f"Bug: Unexpected length mismatch: {len(uuids)=} {count_deleted=}" + raise Exception(msg) return uuids def sql(self, query: str) -> list[tuple]: @@ -416,7 +416,8 @@ def _insert_rows(self, rows: list[tuple]) -> None: def _make_components_str(self, *components: Component) -> str: if not components: - raise ISOperationNotAllowed("At least one component must be passed.") + msg = "At least one component must be passed." + raise ISOperationNotAllowed(msg) or_clause = "OR ".join([f"component_uuid = '{x.uuid}'" for x in components]) return f"({or_clause})" diff --git a/src/infrasys/value_curves.py b/src/infrasys/value_curves.py new file mode 100644 index 0000000..96d1912 --- /dev/null +++ b/src/infrasys/value_curves.py @@ -0,0 +1,211 @@ +"""Defines classes for value curves using cost functions""" + +from typing_extensions import Annotated +from infrasys.component import Component +from infrasys.exceptions import ISOperationNotAllowed +from infrasys.function_data import ( + LinearFunctionData, + QuadraticFunctionData, + PiecewiseLinearData, + PiecewiseStepData, + running_sum, +) +from pydantic import Field +import numpy as np + + +class ValueCurve(Component): + name: Annotated[str, Field(frozen=True)] = "" + input_at_zero: Annotated[ + float | None, + Field( + description="Optional, an explicit representation of the input value at zero output." + ), + ] = None + + +class InputOutputCurve(ValueCurve): + """Input-output curve relating production quality to cost. + + An input-output curve, directly relating the production quantity to the cost: + + .. math:: y = f(x). + + Can be used, for instance, in the representation of a Cost Curve where :math:`x` is MW and + :math:`y` is currency/hr, or in the representation of a Fuel Curve where :math:`x` is MW and + :math:`y` is fuel/hr. + """ + + function_data: Annotated[ + LinearFunctionData | QuadraticFunctionData | PiecewiseLinearData, + Field(description="The underlying `FunctionData` representation of this `ValueCurve`"), + ] + + +class IncrementalCurve(ValueCurve): + """Incremental/marginal curve to relate production quantity to cost derivative. + + An incremental (or 'marginal') curve, relating the production quantity to the derivative of + cost: + + ..math:: y = f'(x). + + Can be used, for instance, in the representation of a Cost Curve + where :math:`x` is MW and :math:`y` is currency/MWh, or in the representation of a Fuel Curve + where :math:`x` is MW and :math:`y` is fuel/MWh. + """ + + function_data: Annotated[ + LinearFunctionData | PiecewiseStepData, + Field(description="The underlying `FunctionData` representation of this `ValueCurve`"), + ] + initial_input: Annotated[ + float | None, + Field( + description="The value of f(x) at the least x for which the function is defined, or \ + the origin for functions with no left endpoint, used for conversion to `InputOutputCurve`" + ), + ] + + def to_input_output(self) -> InputOutputCurve: + """Function to convert IncrementalCurve to InputOutputCurve + + Function takes an IncrementalCurve and converts it to a corresponding + InputOutputCurve depending on the type of function_data. If the IncrementalCurve + uses LinearFunctionData, the new InputOutputCurve is created linear or quadratic data + that correspond to the integral of the original linear function. If the input uses + PiecewiseStepData, the slopes of each segment are used to calculate the corresponding + y values for each x value and used to construct PiecewiseLinearData for the InputOutputCurve. + + Parameters + ---------- + data : IncrementalCurve + Original IncrementalCurve for conversion. + + Returns + ---------- + InputOutputCurve + InputOutputCurve using either QuadraticFunctionData or PiecewiseStepData. + """ + match self.function_data: + case LinearFunctionData(): + p = self.function_data.proportional_term + m = self.function_data.constant_term + + c = self.initial_input + if c is None: + msg = "Cannot convert `IncrementalCurve` with undefined `initial_input`" + raise ISOperationNotAllowed(msg) + + if p == 0: + return InputOutputCurve( + function_data=LinearFunctionData(proportional_term=m, constant_term=c) + ) + else: + return InputOutputCurve( + function_data=QuadraticFunctionData( + quadratic_term=p / 2, proportional_term=m, constant_term=c + ), + input_at_zero=self.input_at_zero, + ) + case PiecewiseStepData(): + c = self.initial_input + if c is None: + msg = "Cannot convert `IncrementalCurve` with undefined `initial_input`" + raise ISOperationNotAllowed(msg) + + points = running_sum(self.function_data) + + return InputOutputCurve( + function_data=PiecewiseLinearData(points=[(p.x, p.y + c) for p in points]), + input_at_zero=self.input_at_zero, + ) + + +class AverageRateCurve(ValueCurve): + """Average rate curve relating production quality to average cost rate. + + An average rate curve, relating the production quantity to the average cost rate from the + origin: + + .. math:: y = f(x)/x. + + Can be used, for instance, in the representation of a + Cost Curve where :math:`x` is MW and :math:`y` is currency/MWh, or in the representation of a + Fuel Curve where :math:`x` is MW and :math:`y` is fuel/MWh. Typically calculated by dividing + absolute values of cost rate or fuel input rate by absolute values of electric power. + """ + + function_data: Annotated[ + LinearFunctionData | PiecewiseStepData, + Field( + description="The underlying `FunctionData` representation of this `ValueCurve`, or \ + only the oblique asymptote when using `LinearFunctionData`" + ), + ] + initial_input: Annotated[ + float | None, + Field( + description="The value of f(x) at the least x for which the function is defined, or \ + the origin for functions with no left endpoint, used for conversion to `InputOutputCurve`" + ), + ] + + def to_input_output(self) -> InputOutputCurve: + """Function to convert IncrementalCurve to InputOutputCurve + + Function takes an AverageRateCurve and converts it to a corresponding + InputOutputCurve depending on the type of function_data. If the AverageRateCurve + uses LinearFunctionData, the new InputOutputCurve is created with either linear or quadratic + function data, depending on if the original function data is constant or linear. If the + input uses PiecewiseStepData, new y-values are calculated for each x value such that `f(x) = x*y` + and used to construct PiecewiseLinearData for the InputOutputCurve. + + Parameters + ---------- + data : AverageRateCurve + Original AverageRateCurve for conversion. + + Returns + ---------- + InputOutputCurve + InputOutputCurve using either QuadraticFunctionData or PiecewiseStepData. + """ + match self.function_data: + case LinearFunctionData(): + p = self.function_data.proportional_term + m = self.function_data.constant_term + + c = self.initial_input + if c is None: + msg = "Cannot convert `AverageRateCurve` with undefined `initial_input`" + raise ISOperationNotAllowed(msg) + + if p == 0: + return InputOutputCurve( + function_data=LinearFunctionData(proportional_term=m, constant_term=c) + ) + else: + return InputOutputCurve( + function_data=QuadraticFunctionData( + quadratic_term=p, proportional_term=m, constant_term=c + ), + input_at_zero=self.input_at_zero, + ) + case PiecewiseStepData(): + c = self.initial_input + if c is None: + msg = "Cannot convert `AverageRateCurve` with undefined `initial_input`" + raise ISOperationNotAllowed(msg) + + xs = self.function_data.x_coords + ys = np.multiply(xs[1:], self.function_data.y_coords).tolist() + ys.insert(0, c) + + return InputOutputCurve( + function_data=PiecewiseLinearData(points=list(zip(xs, ys))), + input_at_zero=self.input_at_zero, + ) + case _: + msg = "Function is not valid for the type of data provided." + raise ISOperationNotAllowed(msg) diff --git a/tests/test_cost_curves.py b/tests/test_cost_curves.py new file mode 100644 index 0000000..dae6feb --- /dev/null +++ b/tests/test_cost_curves.py @@ -0,0 +1,100 @@ +from infrasys.cost_curves import CostCurve, FuelCurve +from infrasys.function_data import LinearFunctionData +from infrasys.value_curves import InputOutputCurve +from infrasys import Component +from .models.simple_system import SimpleSystem + + +class CurveComponent(Component): + cost_curve: CostCurve + + +def test_cost_curve(): + # Cost curve + cost_curve = CostCurve( + value_curve=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ), + vom_units=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=2.0, constant_term=1.0) + ), + ) + + assert cost_curve.value_curve.function_data.proportional_term == 1.0 + assert cost_curve.vom_units.function_data.proportional_term == 2.0 + + +def test_fuel_curve(): + # Fuel curve + fuel_curve = FuelCurve( + value_curve=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ), + vom_units=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=2.0, constant_term=1.0) + ), + fuel_cost=2.5, + ) + + assert fuel_curve.value_curve.function_data.proportional_term == 1.0 + assert fuel_curve.vom_units.function_data.proportional_term == 2.0 + assert fuel_curve.fuel_cost == 2.5 + + +def test_value_curve_custom_serialization(): + component = CurveComponent( + name="test", + cost_curve=CostCurve( + value_curve=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ), + vom_units=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=2.0, constant_term=1.0) + ), + ), + ) + + model_dump = component.model_dump(mode="json") + assert model_dump["cost_curve"]["value_curve"]["function_data"]["proportional_term"] == 1.0 + + model_dump = component.model_dump(context={"magnitude_only": True}) + assert model_dump["cost_curve"]["value_curve"]["function_data"]["proportional_term"] == 1.0 + + model_dump = component.model_dump(mode="json", context={"magnitude_only": True}) + assert model_dump["cost_curve"]["value_curve"]["function_data"]["proportional_term"] == 1.0 + + +def test_value_curve_serialization(tmp_path): + system = SimpleSystem(auto_add_composed_components=True) + + v1 = CurveComponent( + name="test", + cost_curve=CostCurve( + value_curve=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ), + vom_units=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=2.0, constant_term=1.0) + ), + ), + ) + system.add_component(v1) + filename = tmp_path / "value_curve.json" + + system.to_json(filename, overwrite=True) + system2 = SimpleSystem.from_json(filename) + + assert system2 is not None + + v2 = system2.get_component(CurveComponent, "test") + + assert v2 is not None + assert isinstance(v1.cost_curve.value_curve.function_data, LinearFunctionData) + assert ( + v1.cost_curve.value_curve.function_data.proportional_term + == v2.cost_curve.value_curve.function_data.proportional_term + ) + assert ( + v1.cost_curve.value_curve.function_data.constant_term + == v2.cost_curve.value_curve.function_data.constant_term + ) diff --git a/tests/test_function_data.py b/tests/test_function_data.py index 3888373..57e683c 100644 --- a/tests/test_function_data.py +++ b/tests/test_function_data.py @@ -1,7 +1,19 @@ -from infrasys.function_data import PiecewiseStepData, PiecewiseLinearData, XYCoords +from infrasys.function_data import ( + LinearFunctionData, + PiecewiseStepData, + PiecewiseLinearData, + XYCoords, + running_sum, +) +from infrasys import Component +from .models.simple_system import SimpleSystem import pytest +class FunctionDataComponent(Component): + function_data: LinearFunctionData + + def test_xycoords(): test_xy = XYCoords(x=1.0, y=2.0) @@ -29,22 +41,76 @@ def test_piecewise_linear(): def test_piecewise_step(): # Check minimum x values - test_x = [2] - test_y = [1] + test_x = [2.0] + test_y = [1.0] with pytest.raises(ValueError): PiecewiseStepData(x_coords=test_x, y_coords=test_y) # Check ascending x values - test_x = [1, 4, 3] - test_y = [2, 4] + test_x = [1.0, 4.0, 3.0] + test_y = [2.0, 4.0] with pytest.raises(ValueError): PiecewiseStepData(x_coords=test_x, y_coords=test_y) # Check length of x and y lists - test_x = [1, 2, 3] - test_y = [2, 4, 3] + test_x = [1.0, 2.0, 3.0] + test_y = [2.0, 4.0, 3.0] with pytest.raises(ValueError): PiecewiseStepData(x_coords=test_x, y_coords=test_y) + + +def test_function_data_custom_serialization(): + component = FunctionDataComponent( + name="test", function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ) + + model_dump = component.model_dump(mode="json") + print(model_dump) + assert model_dump["function_data"]["proportional_term"] == 1.0 + + model_dump = component.model_dump(context={"magnitude_only": True}) + assert model_dump["function_data"]["proportional_term"] == 1.0 + + model_dump = component.model_dump(mode="json", context={"magnitude_only": True}) + assert model_dump["function_data"]["proportional_term"] == 1.0 + + +def test_function_data_serialization(tmp_path): + system = SimpleSystem(auto_add_composed_components=True) + + f1 = FunctionDataComponent( + name="test", function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ) + system.add_component(f1) + filename = tmp_path / "function_data.json" + + system.to_json(filename, overwrite=True) + system2 = SimpleSystem.from_json(filename) + + assert system2 is not None + + f2 = system2.get_component(FunctionDataComponent, "test") + + assert f2 is not None + assert f1.function_data.proportional_term == f2.function_data.proportional_term + assert f1.function_data.constant_term == f2.function_data.constant_term + + +def test_running_sum(): + test_x = [1.0, 3.0, 6.0] + test_y = [2.0, 4.0] + + pws = PiecewiseStepData(x_coords=test_x, y_coords=test_y) + + points = running_sum(pws) + + x_values = [p.x for p in points] + y_values = [p.y for p in points] + + correct_y_values = [0.0, 4.0, 16.0] + + assert x_values == test_x + assert y_values == correct_y_values diff --git a/tests/test_values_curves.py b/tests/test_values_curves.py new file mode 100644 index 0000000..ca668c1 --- /dev/null +++ b/tests/test_values_curves.py @@ -0,0 +1,155 @@ +from infrasys.function_data import ( + LinearFunctionData, + QuadraticFunctionData, + PiecewiseStepData, +) +from infrasys.value_curves import ( + InputOutputCurve, + IncrementalCurve, + AverageRateCurve, +) +from infrasys import Component +from infrasys.exceptions import ISOperationNotAllowed +from .models.simple_system import SimpleSystem +import pytest + + +class ValueCurveComponent(Component): + value_curve: InputOutputCurve + + +def test_input_output_curve(): + curve = InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=1.0) + ) + + assert isinstance(curve, InputOutputCurve) + assert isinstance(curve.function_data, LinearFunctionData) + + +def test_incremental_curve(): + curve = IncrementalCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=1.0), + initial_input=1.0, + ) + + assert isinstance(curve, IncrementalCurve) + assert isinstance(curve.function_data, LinearFunctionData) + + +def test_average_rate_curve(): + curve = AverageRateCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=1.0), + initial_input=1.0, + ) + + assert isinstance(curve, AverageRateCurve) + assert isinstance(curve.function_data, LinearFunctionData) + + +def test_average_rate_conversion(): + # Linear function data + curve = AverageRateCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0), + initial_input=None, + ) + with pytest.raises(ISOperationNotAllowed): + curve.to_input_output() + + curve.initial_input = 0.0 + new_curve = curve.to_input_output() + assert isinstance(new_curve, InputOutputCurve) + assert isinstance(new_curve.function_data, QuadraticFunctionData) + assert new_curve.function_data.quadratic_term == 1.0 + + curve.function_data.proportional_term = 0.0 + new_curve = curve.to_input_output() + assert isinstance(new_curve, InputOutputCurve) + assert isinstance(new_curve.function_data, LinearFunctionData) + assert new_curve.function_data.proportional_term == 2.0 + + # Piecewise step data + data = PiecewiseStepData(x_coords=[1.0, 3.0, 5.0], y_coords=[2.0, 6.0]) + curve = AverageRateCurve(function_data=data, initial_input=None) + with pytest.raises(ISOperationNotAllowed): + curve.to_input_output() + + curve.initial_input = 0.0 + new_curve = curve.to_input_output() + assert isinstance(new_curve, InputOutputCurve) + + +def test_incremental_conversion(): + # Linear function data + curve = IncrementalCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=1.0), + initial_input=None, + ) + with pytest.raises(ISOperationNotAllowed): + curve.to_input_output() + curve.initial_input = 0.0 + new_curve = curve.to_input_output() + assert isinstance(new_curve, InputOutputCurve) + assert isinstance(new_curve.function_data, QuadraticFunctionData) + assert new_curve.function_data.quadratic_term == 0.5 + + curve.function_data.proportional_term = 0.0 + new_curve = curve.to_input_output() + assert isinstance(new_curve, InputOutputCurve) + assert isinstance(new_curve.function_data, LinearFunctionData) + assert new_curve.function_data.proportional_term == 1.0 + + # Piecewise step data + data = PiecewiseStepData(x_coords=[1.0, 3.0, 5.0], y_coords=[2.0, 6.0]) + curve = IncrementalCurve(function_data=data, initial_input=None) + with pytest.raises(ISOperationNotAllowed): + curve.to_input_output() + curve.initial_input = 0.0 + new_curve = curve.to_input_output() + assert isinstance(new_curve, InputOutputCurve) + + +def test_value_curve_custom_serialization(): + component = ValueCurveComponent( + name="test", + value_curve=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ), + ) + + model_dump = component.model_dump(mode="json") + assert model_dump["value_curve"]["function_data"]["proportional_term"] == 1.0 + + model_dump = component.model_dump(context={"magnitude_only": True}) + assert model_dump["value_curve"]["function_data"]["proportional_term"] == 1.0 + + model_dump = component.model_dump(mode="json", context={"magnitude_only": True}) + assert model_dump["value_curve"]["function_data"]["proportional_term"] == 1.0 + + +def test_value_curve_serialization(tmp_path): + system = SimpleSystem(auto_add_composed_components=True) + + v1 = ValueCurveComponent( + name="test", + value_curve=InputOutputCurve( + function_data=LinearFunctionData(proportional_term=1.0, constant_term=2.0) + ), + ) + system.add_component(v1) + filename = tmp_path / "value_curve.json" + + system.to_json(filename, overwrite=True) + system2 = SimpleSystem.from_json(filename) + + assert system2 is not None + + v2 = system2.get_component(ValueCurveComponent, "test") + + assert v2 is not None + assert isinstance(v1.value_curve.function_data, LinearFunctionData) + assert ( + v1.value_curve.function_data.proportional_term + == v2.value_curve.function_data.proportional_term + ) + assert v1.value_curve.function_data.constant_term == v2.value_curve.function_data.constant_term