diff --git a/src/datagen.py b/src/datagen.py index 6411fb8..bd4c3c9 100644 --- a/src/datagen.py +++ b/src/datagen.py @@ -54,80 +54,80 @@ class DataGenerator(tf.keras.utils.Sequence): # structure "n_bedrooms", "n_bathrooms", - "attic_type", - "sqft", - "foundation_type", - "garage_size_n_car", - "n_stories", - "orientation_degrees", - "roof_material", - "window_wall_ratio", - "window_ufactor", - "window_shgc", - # heating - "heating_fuel", - "heating_appliance_type", - "has_ducted_heating", - "heating_efficiency_nominal_percentage", - "heating_setpoint_degrees_f", - "heating_setpoint_offset_magnitude_degrees_f", - # cooling - "ac_type", - "cooled_space_percentage", - "cooling_efficiency_eer", - "cooling_setpoint_degrees_f", - "cooling_setpoint_offset_magnitude_degrees_f", - # water heater - "water_heater_fuel", - "water_heater_type", - "water_heater_tank_volume_gal", - "water_heater_efficiency_ef", - "water_heater_recovery_efficiency_ef", - "has_water_heater_in_unit", - # ducts - "has_ducts", - "duct_insulation_r_value", - "duct_leakage_percentage", - "infiltration_ach50", - # insulalation - "wall_material", - "insulation_wall_r_value", - "insulation_foundation_wall_r_value", - "insulation_slab_r_value", - "insulation_rim_joist_r_value", - "insulation_floor_r_value", - "insulation_ceiling_r_value", - "insulation_roof_r_value", - # building type - "is_attached", - "is_mobile_home", - "n_building_units", - "is_middle_unit", - "unit_level_in_building", - # other appliances - "has_ceiling_fan", - "clothes_dryer_fuel", - "clothes_washer_efficiency", - "cooking_range_fuel", - "dishwasher_efficiency_kwh", - "lighting_efficiency", - "refrigerator_extra_efficiency_ef", - "has_standalone_freezer", - "has_gas_fireplace", - "has_gas_grill", - "has_gas_lighting", - "has_well_pump", - "hot_tub_spa_fuel", - "pool_heater_fuel", - "refrigerator_efficiency_ef", - "plug_load_percentage", - "usage_level_appliances", - # misc - "climate_zone_temp", - "climate_zone_moisture", - "neighbor_distance_ft", - "n_occupants", - "vintage", + # "attic_type", + # "sqft", + # "foundation_type", + # "garage_size_n_car", + # "n_stories", + # "orientation_degrees", + # "roof_material", + # "window_wall_ratio", + # "window_ufactor", + # "window_shgc", + # # heating + # "heating_fuel", + # "heating_appliance_type", + # "has_ducted_heating", + # "heating_efficiency_nominal_percentage", + # "heating_setpoint_degrees_f", + # "heating_setpoint_offset_magnitude_degrees_f", + # # cooling + # "ac_type", + # "cooled_space_percentage", + # "cooling_efficiency_eer", + # "cooling_setpoint_degrees_f", + # "cooling_setpoint_offset_magnitude_degrees_f", + # # water heater + # "water_heater_fuel", + # "water_heater_type", + # "water_heater_tank_volume_gal", + # "water_heater_efficiency_ef", + # "water_heater_recovery_efficiency_ef", + # "has_water_heater_in_unit", + # # ducts + # "has_ducts", + # "duct_insulation_r_value", + # "duct_leakage_percentage", + # "infiltration_ach50", + # # insulalation + # "wall_material", + # "insulation_wall_r_value", + # "insulation_foundation_wall_r_value", + # "insulation_slab_r_value", + # "insulation_rim_joist_r_value", + # "insulation_floor_r_value", + # "insulation_ceiling_r_value", + # "insulation_roof_r_value", + # # building type + # "is_attached", + # "is_mobile_home", + # "n_building_units", + # "is_middle_unit", + # "unit_level_in_building", + # # other appliances + # "has_ceiling_fan", + # "clothes_dryer_fuel", + # "clothes_washer_efficiency", + # "cooking_range_fuel", + # "dishwasher_efficiency_kwh", + # "lighting_efficiency", + # "refrigerator_extra_efficiency_ef", + # "has_standalone_freezer", + # "has_gas_fireplace", + # "has_gas_grill", + # "has_gas_lighting", + # "has_well_pump", + # "hot_tub_spa_fuel", + # "pool_heater_fuel", + # "refrigerator_efficiency_ef", + # "plug_load_percentage", + # "usage_level_appliances", + # # misc + # "climate_zone_temp", + # "climate_zone_moisture", + # "neighbor_distance_ft", + # "n_occupants", + # "vintage", # fuel indicators -- these must be present for post-processing to work!! "has_methane_gas_appliance", "has_fuel_oil_appliance", diff --git a/src/model_training.py b/src/model_training.py index 1487267..4125086 100644 --- a/src/model_training.py +++ b/src/model_training.py @@ -131,13 +131,17 @@ class SurrogateModelingWrapper(mlflow.pyfunc.PythonModel): - targets (list of str) : List of consumption group targets """ - def __init__(self, trained_model, building_features, weather_features, targets): + def load_context(self, context): + import tensorflow as tf + self.model = tf.keras.models.load_model(context.artifacts["tf_model"]) + + def __init__(self, building_features, weather_features, targets): """ Parameters: - trained_model: The trained mlflow keras model See class attributes for details on other params. """ - self.model = trained_model + # self.model = trained_model self.building_features = building_features self.weather_features = weather_features self.targets = targets @@ -269,20 +273,44 @@ def convert_feature_dataframe_to_dict( callbacks=[keras.callbacks.EarlyStopping(monitor="val_loss", patience=15)], ) + keras_model.save('model.keras') + # wrap in custom class that defines pre and post processing steps to be applied when called at inference time pyfunc_model = SurrogateModelingWrapper( - trained_model=keras_model, + #trained_model=keras_model, building_features=train_gen.building_features, weather_features=train_gen.weather_features, targets=train_gen.targets, ) + import cloudpickle + + conda_env = { + "channels": ["defaults"], + "dependencies": [ + f"python=3.11", + "pip", + { + "pip": [ + f"mlflow=={mlflow.__version__}", + f"tensorflow=={tf.__version__}", + f"cloudpickle=={cloudpickle.__version__}", + ], + }, + ], + "name": "tf_env", + } + + mlflow.pyfunc.log_model( python_model=pyfunc_model, artifact_path=sm.artifact_path, - code_paths=["surrogate_model.py"], + artifacts={"tf_model": 'model.keras'}, + conda_env=conda_env + # code_paths=["surrogate_model.py"], # signature=signature ) + # skip registering model for now.. # mlflow.register_model(f"runs:/{run_id}/{sm.artifact_path}", str(sm)) @@ -308,7 +336,11 @@ def convert_feature_dataframe_to_dict( print(run_id) # mlflow.pyfunc.get_model_dependencies(model_uri=sm.get_model_uri(run_id=run_id)) # Load the model using its registered name and version/stage from the MLflow model registry + +# To address: ValueError: The `{arg_name}` of this `Lambda` layer is a Python lambda. Deserializing it is unsafe. If you trust the source of the config artifact, you can override this error by passing `safe_mode=False` to `from_config()`, or calling `keras.config.enable_unsafe_deserialization(). +keras.config.enable_unsafe_deserialization() model_loaded = mlflow.pyfunc.load_model(model_uri=sm.get_model_uri(run_id=run_id)) + test_gen = DataGenerator(train_data=test_data.limit(10)) # load input data table as a Spark DataFrame input_data = test_gen.training_set.load_df().toPandas() @@ -320,3 +352,7 @@ def convert_feature_dataframe_to_dict( # DBTITLE 1,Pass Run ID to next notebook if running in job if not DEBUG: dbutils.jobs.taskValues.set(key="run_id", value=run_id) + +# COMMAND ---------- + + diff --git a/src/surrogate_model.py b/src/surrogate_model.py index 849d2bf..f8cd696 100644 --- a/src/surrogate_model.py +++ b/src/surrogate_model.py @@ -8,8 +8,7 @@ from pyspark.sql import DataFrame from pyspark.sql.types import ArrayType, DoubleType from tensorflow import keras -from tensorflow.keras import layers, models - +from tensorflow.keras import layers, models, saving from src.datagen import DataGenerator @@ -149,11 +148,35 @@ def conv_batchnorm_relu(x:tf.keras.layers, filters:int, kernel_size:int, name:st wm = conv_batchnorm_relu(wm, filters=16, kernel_size=8, name = "first") wm = conv_batchnorm_relu(wm, filters=8, kernel_size=8, name = "second") + # # Custom layer for summing across the second dimension + # @keras.saving.register_keras_serializable() + # class SumLayer(tf.keras.layers.Layer): + # def __init__(self, dtype, **kwargs): + # super().__init__(**kwargs) + # self.dtype = dtype + + # def call(self, inputs): + # return tf.reduce_sum(inputs, axis=1) + + # # def __init__(self, arg1, arg2, **kwargs): + # # super().__init__(**kwargs) + # # self.arg1 = arg1 + # # self.arg2 = arg2 + + # def get_config(self): + # config = super().get_config().copy() + # config.update({ + # "dtype": self.dtype, + # }) + # return config + + # # Use the custom SumLayer to sum across the time dimension + # wm = SumLayer(dtype=layer_params["dtype"])(wm) # sum the time dimension wm = layers.Lambda( lambda x: tf.keras.backend.sum(x, axis=1), dtype=layer_params["dtype"], - # output_shape = (8,) -- needed for tf v2.16.1 + output_shape = (8,) ##needed for tf v2.16.1 )(wm) wmo = models.Model( @@ -179,7 +202,8 @@ def conv_batchnorm_relu(x:tf.keras.layers, filters:int, kernel_size:int, name:st ) final_model.compile( - loss=masked_mae, + # loss=masked_mae, + loss="mse", optimizer="adam", # metrics=[mape], ) @@ -297,31 +321,31 @@ def mape(y_true, y_pred): return 100.0 * tf.keras.backend.mean(diff[y_true != 0], axis=-1) -@keras.saving.register_keras_serializable(package="my_package", name="masked_mae") -def masked_mae(y_true:tf.Tensor, y_pred:tf.Tensor) -> tf.Tensor: - """ - Calculate the Mean Absolute Error (MAE) between true and predicted values, ignoring those where y_true=0. +# @keras.saving.register_keras_serializable(package="my_package", name="masked_mae") +# def masked_mae(y_true:tf.Tensor, y_pred:tf.Tensor) -> tf.Tensor: +# """ +# Calculate the Mean Absolute Error (MAE) between true and predicted values, ignoring those where y_true=0. - This custom loss function is designed for scenarios where zero values in the true values are considered to be irrelevant and should not contribute to the loss calculation. It applies a mask to both the true and predicted values to exclude these zero entries before computing the MAE. The decorator allows this function to be serialized and logged alongside the keras model. +# This custom loss function is designed for scenarios where zero values in the true values are considered to be irrelevant and should not contribute to the loss calculation. It applies a mask to both the true and predicted values to exclude these zero entries before computing the MAE. The decorator allows this function to be serialized and logged alongside the keras model. - Args: - - y_true (tf.Tensor): The true values. - - y_pred (tf.Tensor): The predicted values. +# Args: +# - y_true (tf.Tensor): The true values. +# - y_pred (tf.Tensor): The predicted values. - Returns: - - tf.Tensor: The mean absolute error computed over non-zero true values. This is just a single scalar stored in a tensor. - """ - # Create a mask where targets are not zero - mask = tf.not_equal(y_true, 0) - - # Apply the mask to remove zero-target influence - y_true_masked = tf.boolean_mask(y_true, mask) - y_pred_masked = tf.boolean_mask(y_pred, mask) - - # Check if the masked tensor is empty - if tf.size(y_true_masked) == 0: - # Return zero as the loss if no elements to process - return tf.constant(0.0) - else: - # Calculate the mean absolute error on the masked data - return tf.reduce_mean(tf.abs(y_true_masked - y_pred_masked)) \ No newline at end of file +# Returns: +# - tf.Tensor: The mean absolute error computed over non-zero true values. This is just a single scalar stored in a tensor. +# """ +# # Create a mask where targets are not zero +# mask = tf.not_equal(y_true, 0) + +# # Apply the mask to remove zero-target influence +# y_true_masked = tf.boolean_mask(y_true, mask) +# y_pred_masked = tf.boolean_mask(y_pred, mask) + +# # Check if the masked tensor is empty +# if tf.size(y_true_masked) == 0: +# # Return zero as the loss if no elements to process +# return tf.constant(0.0) +# else: +# # Calculate the mean absolute error on the masked data +# return tf.reduce_mean(tf.abs(y_true_masked - y_pred_masked)) \ No newline at end of file