Skip to content

Commit

Permalink
Create Release 1.16.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakehouse Engine authored and jmcorreia committed Oct 12, 2023
1 parent bf2cf0b commit 5f40f14
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 53 deletions.
4 changes: 2 additions & 2 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
- [ ] Description of PR changes above includes a link to [an existing GitHub issue](https://github.com/adidas/lakehouse-engine/issues)
- [ ] PR title is prefixed with one of: [BUGFIX], [FEATURE]
- [ ] Code is linted and tests -
- [ ] Appropriate tests and docs have been updated
- [ ] Code is linted and tested -
```
make style
make lint
make test
make test-security
```
- [ ] Appropriate tests and docs have been updated

For more information about contributing, see [Contribute](https://github.com/adidas/lakehouse-engine/blob/master/CONTRIBUTING.md).

Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

💡 Do you have ideas for new features? Open a feature request on [GitHub](https://github.com/adidas/lakehouse-engine/issues).

🚀 Want to find the available releases? Check our release notes on [GitHub](https://github.com/adidas/lakehouse-engine/releases) and [PyPi](https://fill_later).
🚀 Want to find the available releases? Check our release notes on [GitHub](https://github.com/adidas/lakehouse-engine/releases) and [PyPi](https://pypi.org/project/lakehouse-engine/).

## Prerequisites

Expand Down
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]
Copyright 2023 adidas AG

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
Binary file modified assets/img/lakehouse_dp_usage.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion cicd/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.16.0
current_version = 1.16.1
commit = False
tag = False

Expand Down
4 changes: 2 additions & 2 deletions lakehouse_engine/algorithms/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DataLoader(Algorithm):
steps and configurations through a dict based configuration, which we name ACON
in our framework.
Since an ACON is a dict you can pass a custom transformer trough a python function
Since an ACON is a dict you can pass a custom transformer through a python function
and, therefore, the DataLoader can also be used to load data with custom
transformations not provided in our transformers package.
Expand Down Expand Up @@ -248,7 +248,7 @@ def _get_transform_specs(self) -> List[TransformSpec]:
in the same order that they appear in the list of transformations. This means
that other supported transformations that appear after an
unsupported one continue to stay one the normal execution plan,
i.e., outside the foreachBatch function. Therefore this may
i.e., outside the foreachBatch function. Therefore, this may
make your algorithm to execute a different logic than the one you
originally intended. For this reason:
1) ALWAYS PLACE UNSUPPORTED STREAMING TRANSFORMATIONS AT LAST;
Expand Down
6 changes: 3 additions & 3 deletions lakehouse_engine/algorithms/reconciliator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ class Reconciliator(Executable):
Finally, it aggregates the differences, using the supplied aggregation function
(e.g., sum, avg, min, max, etc).
All of this configurations are passed via the ACON to instantiate a
All of these configurations are passed via the ACON to instantiate a
ReconciliatorSpec object.
Notes:
- It is crucial that both the current and truth datasets have exactly the same
structure.
- You should not use 0 as yellow or red threshold, as the algorithm will verify
if the difference between the truth and current values is bigger or bigger
if the difference between the truth and current values is bigger
or equal than those thresholds.
- The reconciliation does not produce any negative values or percentages, as we
use the absolute value of the differences. This means that the recon result
Expand Down Expand Up @@ -135,7 +135,7 @@ def execute(self) -> None:
status = "green"

# if ignore_empty_df is true, run empty check on truth_df and current_results_df
# if both the dataframe is empty then exit with green
# if both the dataframes are empty then exit with green
if (
self.spec.ignore_empty_df
and truth_df.isEmpty()
Expand Down
62 changes: 29 additions & 33 deletions lakehouse_engine/algorithms/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
SensorAlreadyExistsException,
)
from lakehouse_engine.core.definitions import (
FILE_INPUT_FORMATS,
SENSOR_ALLOWED_DATA_FORMATS,
InputFormat,
ReadType,
SensorSpec,
SensorStatus,
)
Expand All @@ -31,6 +32,7 @@ def __init__(self, acon: dict):
acon: algorithm configuration.
"""
self.spec: SensorSpec = SensorSpec.create_from_acon(acon=acon)
self._validate_sensor_spec()

if self._check_if_sensor_already_exists():
raise SensorAlreadyExistsException(
Expand All @@ -41,31 +43,13 @@ def execute(self) -> bool:
"""Execute the sensor."""
self._LOGGER.info(f"Starting {self.spec.input_spec.data_format} sensor...")

if InputFormat.exists(self.spec.input_spec.data_format):
new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
if (
self.spec.input_spec.data_format == InputFormat.KAFKA.value
or self.spec.input_spec.data_format in FILE_INPUT_FORMATS
or (
self.spec.input_spec.db_table is not None
and self.spec.input_spec.data_format != InputFormat.JDBC.value
)
):
Sensor._run_streaming_sensor(
sensor_spec=self.spec, new_data_df=new_data_df
)
elif self.spec.input_spec.data_format == InputFormat.JDBC.value:
Sensor._run_batch_sensor(
sensor_spec=self.spec,
new_data_df=new_data_df,
)
else:
raise NotImplementedError(
"A sensor has not been implemented yet for this data format."
)
else:
raise NotImplementedError(
f"Data format {self.spec.input_spec.data_format} isn't implemented yet."
new_data_df = SensorUpstreamManager.read_new_data(sensor_spec=self.spec)
if self.spec.input_spec.read_type == ReadType.STREAMING.value:
Sensor._run_streaming_sensor(sensor_spec=self.spec, new_data_df=new_data_df)
elif self.spec.input_spec.read_type == ReadType.BATCH.value:
Sensor._run_batch_sensor(
sensor_spec=self.spec,
new_data_df=new_data_df,
)

has_new_data = SensorControlTableManager.check_if_sensor_has_acquired_data(
Expand Down Expand Up @@ -106,13 +90,6 @@ def _run_streaming_sensor(
cls, sensor_spec: SensorSpec, new_data_df: DataFrame
) -> None:
"""Run sensor in streaming mode (internally runs in batch mode)."""
if not new_data_df.isStreaming:
raise RuntimeError(
f"The sensor {sensor_spec.sensor_id} for the requested data "
"format needs to be executed "
"in streaming mode! Please change the read type of the input "
"spec."
)

def foreach_batch_check_new_data(df: DataFrame, batch_id: int) -> None:
Sensor._run_batch_sensor(
Expand Down Expand Up @@ -162,3 +139,22 @@ def _run_batch_sensor(
f"Successfully updated sensor status for sensor "
f"{sensor_spec.sensor_id}..."
)

def _validate_sensor_spec(self) -> None:
"""Validate if sensor spec Read Type is allowed for the selected Data Format."""
if InputFormat.exists(self.spec.input_spec.data_format):
if (
self.spec.input_spec.data_format
not in SENSOR_ALLOWED_DATA_FORMATS[self.spec.input_spec.read_type]
):
raise NotImplementedError(
f"A sensor has not been implemented yet for this data format or, "
f"this data format is not available for the read_type"
f" {self.spec.input_spec.read_type}. "
f"Check the allowed combinations of read_type and data_formats:"
f" {SENSOR_ALLOWED_DATA_FORMATS}"
)
else:
raise NotImplementedError(
f"Data format {self.spec.input_spec.data_format} isn't implemented yet."
)
8 changes: 8 additions & 0 deletions lakehouse_engine/core/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,14 @@ class SensorStatus(Enum):
"sensors.status_change_timestamp": "updates.status_change_timestamp",
}

SENSOR_ALLOWED_DATA_FORMATS = {
ReadType.STREAMING.value: [InputFormat.KAFKA.value, *FILE_INPUT_FORMATS],
ReadType.BATCH.value: [
InputFormat.DELTAFILES.value,
InputFormat.JDBC.value,
],
}


class SAPLogchain(Enum):
"""Defaults used on consuming data from SAP Logchain."""
Expand Down
10 changes: 5 additions & 5 deletions lakehouse_engine/core/sensor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def check_if_sensor_has_acquired_data(
"""Check if sensor has acquired new data.
Args:
sensor_id: sendor id.
sensor_id: sensor id.
control_db_table_name: db.table to control sensor runs.
Returns:
Expand Down Expand Up @@ -158,8 +158,8 @@ def _convert_sensor_to_data(
"status": status,
"status_change_timestamp": status_change_timestamp,
"checkpoint_location": spec.checkpoint_location,
"upstream_key": upstream_key,
"upstream_value": upstream_value,
"upstream_key": str(upstream_key),
"upstream_value": str(upstream_value),
}
]

Expand Down Expand Up @@ -195,7 +195,7 @@ def read_sensor_table_data(
Args:
sensor_id: sensor id. If this parameter is defined search occurs
only considering this parameter. Otherwise it considers sensor
only considering this parameter. Otherwise, it considers sensor
assets and checkpoint location.
control_db_table_name: db.table to control sensor runs.
assets: list of assets that are fueled by the pipeline
Expand Down Expand Up @@ -333,7 +333,7 @@ def read_new_data(cls, sensor_spec: SensorSpec) -> DataFrame:
sensor_spec: sensor spec containing all sensor information.
Return:
An empty dataframe if doesn't have new data otherwise the new data
An empty dataframe if it doesn't have new data otherwise the new data
"""
new_data_df = ReaderFactory.get_data(sensor_spec.input_spec)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

setuptools.setup(
name="lakehouse-engine",
version="1.16.0",
version="1.16.1",
author="Adidas Lakehouse Foundations Team",
author_email="[email protected]",
description="A Spark framework serving as the engine for several lakehouse "
Expand Down
10 changes: 6 additions & 4 deletions tests/unit/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ def test_execute_stream_sensor(self, scenario: dict, capsys: Any) -> None:
"read_type": ReadType.BATCH.value,
"data_format": InputFormat.JDBC.value,
},
"base_checkpoint_location": "s3://dummy-bucket",
},
"expected_result": True,
},
Expand Down Expand Up @@ -403,8 +402,12 @@ def test_execute_batch_sensor(self, scenario: dict, capsys: Any) -> None:
},
"base_checkpoint_location": "s3://dummy-bucket",
},
"expected_result": "A sensor has not been "
"implemented yet for this data format.",
"expected_result": "A sensor has not been implemented yet for "
"this data format or, this data format is not available for "
"the read_type batch. Check the allowed combinations of "
"read_type and data_formats: {'streaming': ['kafka', 'avro', "
"'json', 'parquet', 'csv', 'delta', "
"'cloudfiles'], 'batch': ['delta', 'jdbc']}",
},
{
"scenario_name": "raise_exception_sensor_"
Expand Down Expand Up @@ -448,7 +451,6 @@ def test_execute_sensor_raise_no_input_spec_format_implemented(
"sensor_id": "sensor_id_1",
"assets": ["asset_1"],
"control_db_table_name": "control_sensor_table_name",
"control_db_table_name": "control_sensor_table_name",
"input_spec": {
"spec_id": "input_spec",
"read_type": ReadType.STREAMING.value,
Expand Down

0 comments on commit 5f40f14

Please sign in to comment.