Skip to content

Commit

Permalink
updated azure processor
Browse files Browse the repository at this point in the history
  • Loading branch information
beatfactor committed Aug 8, 2024
1 parent 01063bc commit 57317c1
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 8 deletions.
5 changes: 5 additions & 0 deletions oceanstream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
'distributed.comm.retry.count': 0
})


def initialize(settings, file_path, log_level=None):
config_data = load_config(settings["config"])
config_data["raw_path"] = file_path
Expand Down Expand Up @@ -103,6 +104,8 @@ def convert(
output: str = typer.Option(None,
help="Destination path for saving Zarr converted data. Defaults to a predefined "
"directory if not specified."),
sonar_model: str = typer.Option(None, help="Sonar model used to collect the data",
show_choices=["AZFP", "EK60", "ES70", "EK80", "ES80", "EA640", "AD2CP"]),
workers_count: int = typer.Option(os.cpu_count(), help="Number of CPU workers to use for processing"),
config: str = typer.Option(None, help="Path to a configuration file"),
log_level: str = typer.Option("WARNING", help="Set the logging level",
Expand All @@ -113,6 +116,7 @@ def convert(
"""
settings = {
"config": config,
"sonar_model": sonar_model,
"output_folder": output or DEFAULT_OUTPUT_FOLDER
}

Expand All @@ -122,6 +126,7 @@ def convert(
try:
if filePath.is_file():
from oceanstream.process import convert_raw_file
print(f"[blue]Converting raw file {source} to Zarr...[/blue]")
convert_raw_file(filePath, configData)
print(
f"[blue]✅ Converted raw file {source} to Zarr and wrote output to: {configData['output_folder']} [/blue]")
Expand Down
1 change: 1 addition & 0 deletions oceanstream/exports/csv/csv_export_from_Sv.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def create_location(data: xr.Dataset, epsilon=0.00001, min_distance=0.01) -> pd.

# Apply Ramer-Douglas-Peucker algorithm for thinning coordinates
points = df[["lat", "lon"]].values

thinned_points = ramer_douglas_peucker(points, epsilon)

# Create a thinned DataFrame
Expand Down
24 changes: 23 additions & 1 deletion oceanstream/process/azure/blob_storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import os

import echopype as ep
from adlfs import AzureBlobFileSystem


def list_zarr_files(azfs, path):
def list_zarr_files(path, azfs=None):
"""List all Zarr files in the Azure Blob Storage container along with their metadata."""

if azfs is None:
azfs = get_azfs()

if azfs is None:
raise ValueError("Azure Blob Storage connection string not found and no azfs instance was specified.")

zarr_files = []
for blob in azfs.ls(path, detail=True):
if blob['type'] == 'directory' and not blob['name'].endswith('.zarr'):
Expand All @@ -18,6 +28,18 @@ def list_zarr_files(azfs, path):
return zarr_files


def get_azfs():
"""Get the Azure Blob Storage filesystem object using the connection string from environment variables."""
connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')

if not connection_string:
return None

azfs = AzureBlobFileSystem(connection_string=connection_string)

return azfs


def open_zarr_store(azfs, store_name, chunks=None):
"""Open a Zarr store from Azure Blob Storage."""
mapper = azfs.get_mapper(store_name)
Expand Down
26 changes: 19 additions & 7 deletions oceanstream/process/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,34 @@ def convert_raw_file(file_path, config_data, base_path=None, progress_queue=None
relative_path = file_path_obj.relative_to(base_path)
relative_path = relative_path.parent
else:
relative_path = file_path_obj.name
relative_path = None

echodata, encode_mode = read_file(file_config_data, use_swap=True, skip_integrity_check=True)
file_name = file_path_obj.stem + ".zarr"

if 'cloud_storage' in config_data:
file_name = file_path_obj.stem + ".zarr"
store = _get_chunk_store(config_data['cloud_storage'], Path(relative_path) / file_name)
if relative_path:
file_location = Path(relative_path) / file_name
else:
file_location = file_name
store = _get_chunk_store(config_data['cloud_storage'], file_location)
echodata.to_zarr(save_path=store, overwrite=True, parallel=False)
output_zarr_path = store
else:
output_path = Path(config_data["output_folder"]) / relative_path
output_path.mkdir(parents=True, exist_ok=True)
echodata.to_zarr(save_path=output_path, overwrite=True, parallel=False)
if relative_path:
output_path = Path(config_data["output_folder"]) / relative_path
output_path.mkdir(parents=True, exist_ok=True)
else:
output_path = Path(config_data["output_folder"])

output_zarr_path = output_path / file_name
echodata.to_zarr(save_path=output_zarr_path, overwrite=True, parallel=False)

if progress_queue:
progress_queue.put(file_path)

return output_zarr_path

except Exception as e:
logging.error("Error processing file %s", file_path)
print(Traceback())
Expand All @@ -264,7 +277,6 @@ def _get_chunk_store(storage_config, path):
azfs = AzureBlobFileSystem(**storage_config['storage_options'])

return azfs.get_mapper(f"{storage_config['container_name']}/{path}")

else:
raise ValueError(f"Unsupported storage type: {storage_config['storage_type']}")

Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ numpy
pydantic
rich
tqdm
matplotlib
haversine
IPython
git+https://github.com/OceanStreamIO/echopype-dev.git@oceanstream#egg=echopype

0 comments on commit 57317c1

Please sign in to comment.