diff --git a/CHANGELOG.md b/CHANGELOG.md index 873fdec35..8df156614 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,14 +9,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added + - Processes that take a CRS as argument now try harder to convert your input into a proper EPSG code, to avoid unexpected results when an invalid argument gets sent to the backend. +- Initial `load_geojson` support with `Connection.load_geojson()` ([#424](https://github.com/Open-EO/openeo-python-client/issues/424)) +- Initial `load_url` (for vector cubes) support with `Connection.load_url()` ([#424](https://github.com/Open-EO/openeo-python-client/issues/424)) + ### Changed +- `Connection` based requests: always use finite timeouts by default (20 minutes in general, 30 minutes for synchronous execute requests) + ([#454](https://github.com/Open-EO/openeo-python-client/issues/454)) + ### Removed ### Fixed +- Fix: MultibackendJobManager should stop when finished, also when job finishes with error ([#452](https://github.com/Open-EO/openeo-python-client/issues/432)) ## [0.21.1] - 2023-07-19 diff --git a/docs/api.rst b/docs/api.rst index 78fd290b3..097633709 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -47,6 +47,13 @@ openeo.rest.mlmodel :inherited-members: +openeo.metadata +---------------- + +.. automodule:: openeo.metadata + :members: CollectionMetadata, BandDimension, SpatialDimension, TemporalDimension + + openeo.api.process -------------------- diff --git a/docs/process_mapping.rst b/docs/process_mapping.rst index f25eef6ad..60519285c 100644 --- a/docs/process_mapping.rst +++ b/docs/process_mapping.rst @@ -27,7 +27,7 @@ method or function in the openEO Python Client Library. * - `aggregate_spatial `_ - :py:meth:`ProcessBuilder.aggregate_spatial() `, :py:meth:`aggregate_spatial() `, :py:meth:`DataCube.aggregate_spatial() ` * - `aggregate_spatial_window `_ - - :py:meth:`ProcessBuilder.aggregate_spatial_window() `, :py:meth:`aggregate_spatial_window() ` + - :py:meth:`ProcessBuilder.aggregate_spatial_window() `, :py:meth:`aggregate_spatial_window() `, :py:meth:`DataCube.aggregate_spatial_window() ` * - `aggregate_temporal `_ - :py:meth:`ProcessBuilder.aggregate_temporal() `, :py:meth:`aggregate_temporal() `, :py:meth:`DataCube.aggregate_temporal() ` * - `aggregate_temporal_period `_ @@ -191,11 +191,15 @@ method or function in the openEO Python Client Library. * - `ln `_ - :py:meth:`ProcessBuilder.ln() `, :py:meth:`ln() `, :py:meth:`DataCube.ln() ` * - `load_collection `_ - - :py:meth:`ProcessBuilder.load_collection() `, :py:meth:`load_collection() `, :py:meth:`DataCube.load_collection() ` + - :py:meth:`ProcessBuilder.load_collection() `, :py:meth:`load_collection() `, :py:meth:`DataCube.load_collection() `, :py:meth:`Connection.load_collection() ` + * - `load_geojson `_ + - :py:meth:`VectorCube.load_geojson() `, :py:meth:`Connection.load_geojson() ` * - `load_ml_model `_ - :py:meth:`ProcessBuilder.load_ml_model() `, :py:meth:`load_ml_model() `, :py:meth:`MlModel.load_ml_model() ` * - `load_result `_ - - :py:meth:`ProcessBuilder.load_result() `, :py:meth:`load_result() ` + - :py:meth:`ProcessBuilder.load_result() `, :py:meth:`load_result() `, :py:meth:`Connection.load_result() ` + * - `load_stac `_ + - :py:meth:`Connection.load_stac() ` * - `load_uploaded_files `_ - :py:meth:`ProcessBuilder.load_uploaded_files() `, :py:meth:`load_uploaded_files() ` * - `log `_ @@ -325,4 +329,4 @@ method or function in the openEO Python Client Library. * - `xor `_ - :py:meth:`ProcessBuilder.xor() `, :py:meth:`xor() ` -:subscript:`(Table autogenerated on 2023-03-15)` +:subscript:`(Table autogenerated on 2023-08-07)` diff --git a/openeo/api/process.py b/openeo/api/process.py index f8ae0bf66..405828f24 100644 --- a/openeo/api/process.py +++ b/openeo/api/process.py @@ -45,6 +45,19 @@ def raster_cube(cls, name: str = "data", description: str = "A data cube.") -> ' """ return cls(name=name, description=description, schema={"type": "object", "subtype": "raster-cube"}) + @classmethod + def datacube(cls, name: str = "data", description: str = "A data cube.") -> "Parameter": + """ + Helper to easily create a 'datacube' parameter. + + :param name: name of the parameter. + :param description: description of the parameter + :return: Parameter + + .. versionadded:: 0.22.0 + """ + return cls(name=name, description=description, schema={"type": "object", "subtype": "datacube"}) + @classmethod def string(cls, name: str, description: str = None, default=_DEFAULT_UNDEFINED, values=None) -> 'Parameter': """Helper to create a 'string' type parameter.""" diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index b050a7031..2dedf64fb 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -256,6 +256,7 @@ def run_jobs( (df.status != "finished") & (df.status != "skipped") & (df.status != "start_failed") + & (df.status != "error") ].size > 0 ): diff --git a/openeo/metadata.py b/openeo/metadata.py index 3cdb06323..dd1e294d9 100644 --- a/openeo/metadata.py +++ b/openeo/metadata.py @@ -38,9 +38,9 @@ def rename_labels(self, target, source) -> 'Dimension': """ Rename labels, if the type of dimension allows it. - @param target: List of target labels - @param source: Source labels, or empty list - @return: A new dimension with modified labels, or the same if no change is applied. + :param target: List of target labels + :param source: Source labels, or empty list + :return: A new dimension with modified labels, or the same if no change is applied. """ # In general, we don't have/manage label info here, so do nothing. return Dimension(type=self.type, name=self.name) @@ -104,6 +104,7 @@ def common_names(self) -> List[str]: def band_index(self, band: Union[int, str]) -> int: """ Resolve a given band (common) name/index to band index + :param band: band name, common name or index :return int: band index """ @@ -446,4 +447,5 @@ def _repr_html_(self): return render_component('collection', data=self._orig_metadata) def __str__(self) -> str: + bands = self.band_names if self.has_band_dimension() else "no bands dimension" return f"CollectionMetadata({self.extent} - {self.band_names} - {self.dimension_names()})" diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py new file mode 100644 index 000000000..73458f82a --- /dev/null +++ b/openeo/rest/_testing.py @@ -0,0 +1,98 @@ +import re + +from openeo import Connection + + +class DummyBackend: + """ + Dummy backend that handles sync/batch execution requests + and allows inspection of posted process graphs + """ + + # Default result (can serve both as JSON or binary data) + DEFAULT_RESULT = b'{"what?": "Result data"}' + + def __init__(self, requests_mock, connection: Connection): + self.connection = connection + self.sync_requests = [] + self.batch_jobs = {} + self.next_result = self.DEFAULT_RESULT + requests_mock.post(connection.build_url("/result"), content=self._handle_post_result) + requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs) + requests_mock.post( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results + ) + requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job) + requests_mock.get( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results + ) + requests_mock.get( + re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), + content=self._handle_get_job_result_asset, + ) + + def _handle_post_result(self, request, context): + """handler of `POST /result` (synchronous execute)""" + pg = request.json()["process"]["process_graph"] + self.sync_requests.append(pg) + return self.next_result + + def _handle_post_jobs(self, request, context): + """handler of `POST /jobs` (create batch job)""" + pg = request.json()["process"]["process_graph"] + job_id = f"job-{len(self.batch_jobs):03d}" + self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} + context.status_code = 201 + context.headers["openeo-identifier"] = job_id + + def _get_job_id(self, request) -> str: + match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path) + if not match: + raise ValueError(f"Failed to extract job_id from {request.path}") + job_id = match.group(1) + assert job_id in self.batch_jobs + return job_id + + def _handle_post_job_results(self, request, context): + """Handler of `POST /job/{job_id}/results` (start batch job).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "created" + # TODO: support custom status sequence (instead of directly going to status "finished")? + self.batch_jobs[job_id]["status"] = "finished" + context.status_code = 202 + + def _handle_get_job(self, request, context): + """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" + job_id = self._get_job_id(request) + return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} + + def _handle_get_job_results(self, request, context): + """Handler of `GET /job/{job_id}/results` (list batch job results).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "finished" + return { + "id": job_id, + "assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}}, + } + + def _handle_get_job_result_asset(self, request, context): + """Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "finished" + return self.next_result + + def get_sync_pg(self) -> dict: + """Get one and only synchronous process graph""" + assert len(self.sync_requests) == 1 + return self.sync_requests[0] + + def get_batch_pg(self) -> dict: + """Get one and only batch process graph""" + assert len(self.batch_jobs) == 1 + return self.batch_jobs[max(self.batch_jobs.keys())]["pg"] + + def get_pg(self) -> dict: + """Get one and only batch process graph (sync or batch)""" + pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()] + assert len(pgs) == 1 + return pgs[0] diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index faf80862c..535212764 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -15,10 +15,12 @@ import requests from requests import Response from requests.auth import HTTPBasicAuth, AuthBase +import shapely.geometry.base import openeo from openeo.capabilities import ApiVersionException, ComparableVersion from openeo.config import get_config_option, config_log +from openeo.internal.documentation import openeo_process from openeo.internal.graph_building import PGNode, as_flat_graph, FlatGraphableMixin from openeo.internal.jupyter import VisualDict, VisualList from openeo.internal.processes.builder import ProcessBuilderBase @@ -51,6 +53,11 @@ _log = logging.getLogger(__name__) +# Default timeouts for requests +# TODO: get default_timeout from config? +DEFAULT_TIMEOUT = 20 * 60 +DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE = 30 * 60 + class RestApiConnection: """Base connection class implementing generic REST API request functionality""" @@ -66,7 +73,7 @@ def __init__( self._root_url = root_url self.auth = auth or NullAuth() self.session = session or requests.Session() - self.default_timeout = default_timeout + self.default_timeout = default_timeout or DEFAULT_TIMEOUT self.default_headers = { "User-Agent": "openeo-python-client/{cv} {py}/{pv} {pl}".format( cv=openeo.client_version(), @@ -795,6 +802,9 @@ def capabilities(self) -> RESTCapabilities: load=lambda: RESTCapabilities(data=self.get('/', expected_status=200).json(), url=self._orig_url) ) + def list_input_formats(self) -> dict: + return self.list_file_formats().get("input", {}) + def list_output_formats(self) -> dict: return self.list_file_formats().get("output", {}) @@ -1090,12 +1100,13 @@ def datacube_from_json(self, src: Union[str, Path], parameters: Optional[dict] = """ return self.datacube_from_flat_graph(load_json_resource(src), parameters=parameters) + @openeo_process def load_collection( self, collection_id: str, spatial_extent: Optional[Dict[str, float]] = None, - temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None, - bands: Optional[List[str]] = None, + temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None, + bands: Optional[List[str]] = None, properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None, max_cloud_cover: Optional[float] = None, fetch_metadata=True, @@ -1126,6 +1137,7 @@ def load_collection( load_collection, name="imagecollection", since="0.4.10" ) + @openeo_process def load_result( self, id: str, @@ -1163,6 +1175,7 @@ def load_result( cube.metadata = metadata return cube + @openeo_process def load_stac( self, url: str, @@ -1300,6 +1313,53 @@ def load_ml_model(self, id: Union[str, BatchJob]) -> "MlModel": """ return MlModel.load_ml_model(connection=self, id=id) + @openeo_process + def load_geojson( + self, + data: Union[dict, str, Path, shapely.geometry.base.BaseGeometry, Parameter], + properties: Optional[List[str]] = None, + ): + """ + Converts GeoJSON data as defined by RFC 7946 into a vector data cube. + + :param data: the geometry to load. One of: + + - GeoJSON-style data structure: e.g. a dictionary with ``"type": "Polygon"`` and ``"coordinates"`` fields + - a path to a local GeoJSON file + - a GeoJSON string + - a shapely geometry object + + :param properties: A list of properties from the GeoJSON file to construct an additional dimension from. + :return: new VectorCube instance + + .. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change. + + .. versionadded:: 0.22.0 + """ + + return VectorCube.load_geojson(connection=self, data=data, properties=properties) + + @openeo_process + def load_url(self, url: str, format: str, options: Optional[dict] = None): + """ + Loads a file from a URL + + :param url: The URL to read from. Authentication details such as API keys or tokens may need to be included in the URL. + :param format: The file format to use when loading the data. + :param options: The file format parameters to use when reading the data. + Must correspond to the parameters that the server reports as supported parameters for the chosen ``format`` + :return: new VectorCube instance + + .. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change. + + .. versionadded:: 0.22.0 + """ + if format not in self.list_input_formats(): + # TODO: make this an error? + _log.warning(f"Format {format!r} not listed in back-end input formats") + # TODO: Inspect format's gis_data_type to see if we need to load a VectorCube or classic raster DataCube + return VectorCube.load_url(connection=self, url=url, format=format, options=options) + def create_service(self, graph: dict, type: str, **kwargs) -> Service: # TODO: type hint for graph: is it a nested or a flat one? req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs) @@ -1388,7 +1448,7 @@ def download( self, graph: Union[dict, FlatGraphableMixin, str, Path], outputfile: Union[Path, str, None] = None, - timeout: int = 30 * 60, + timeout: Optional[int] = None, ) -> Union[None, bytes]: """ Downloads the result of a process graph synchronously, @@ -1401,7 +1461,13 @@ def download( :param timeout: timeout to wait for response """ request = self._build_request_with_process_graph(process_graph=graph) - response = self.post(path="/result", json=request, expected_status=200, stream=True, timeout=timeout) + response = self.post( + path="/result", + json=request, + expected_status=200, + stream=True, + timeout=timeout or DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, + ) if outputfile is not None: with Path(outputfile).open(mode="wb") as f: @@ -1410,7 +1476,11 @@ def download( else: return response.content - def execute(self, process_graph: Union[dict, str, Path]): + def execute( + self, + process_graph: Union[dict, str, Path], + timeout: Optional[int] = None, + ): """ Execute a process graph synchronously and return the result (assumed to be JSON). @@ -1419,7 +1489,12 @@ def execute(self, process_graph: Union[dict, str, Path]): :return: parsed JSON response """ req = self._build_request_with_process_graph(process_graph=process_graph) - return self.post(path="/result", json=req, expected_status=200).json() + return self.post( + path="/result", + json=req, + expected_status=200, + timeout=timeout or DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, + ).json() def create_job( self, diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 154893a6f..065acf493 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -981,10 +981,13 @@ def aggregate_spatial_window( :param boundary: Behavior to apply if the number of values for the axes x and y is not a multiple of the corresponding value in the size parameter. Options are: - pad (default): pad the data cube with the no-data value null to fit the required window size. - trim: trim the data cube to fit the required window size. - Set the parameter align to specifies to which corner the data is aligned to. - :param align: If the data requires padding or trimming (see parameter boundary), specifies + + - ``pad`` (default): pad the data cube with the no-data value null to fit the required window size. + - ``trim``: trim the data cube to fit the required window size. + + Use the parameter ``align`` to align the data to the desired corner. + + :param align: If the data requires padding or trimming (see parameter ``boundary``), specifies to which corner of the spatial extent the data is aligned to. For example, if the data is aligned to the upper left, the process pads/trims at the lower-right. :param context: Additional data to be passed to the process. @@ -1043,12 +1046,6 @@ def apply_dimension( the dimension labels will be incrementing integers starting from zero, which can be changed using rename_labels afterwards. The number of labels will equal to the number of values computed by the process. - .. note:: - .. versionchanged:: 0.13.0 - arguments ``code``, ``runtime`` and ``version`` are deprecated if favor of the standard approach - of using an :py:class:`UDF ` object in the ``process`` argument. - See :ref:`old_udf_api` for more background about the changes. - :param code: [**deprecated**] UDF code or process identifier (optional) :param runtime: [**deprecated**] UDF runtime to use (optional) :param process: the "child callback": @@ -1075,6 +1072,12 @@ def apply_dimension( :return: A datacube with the UDF applied to the given dimension. :raises: DimensionNotAvailable + + .. versionchanged:: 0.13.0 + arguments ``code``, ``runtime`` and ``version`` are deprecated if favor of the standard approach + of using an :py:class:`UDF ` object in the ``process`` argument. + See :ref:`old_udf_api` for more background about the changes. + """ # TODO #137 #181 #312 remove support for code/runtime/version if runtime or (isinstance(code, str) and "\n" in code): diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index c9e63ff30..77e0b2bb8 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -1,14 +1,19 @@ +import json import pathlib import typing -from typing import Union, Optional +from typing import List, Optional, Union +import shapely.geometry.base + +import openeo +from openeo.api.process import Parameter from openeo.internal.documentation import openeo_process from openeo.internal.graph_building import PGNode from openeo.internal.warnings import legacy_alias from openeo.metadata import CollectionMetadata -from openeo.rest._datacube import _ProcessGraphAbstraction, UDF -from openeo.rest.mlmodel import MlModel +from openeo.rest._datacube import UDF, _ProcessGraphAbstraction from openeo.rest.job import BatchJob +from openeo.rest.mlmodel import MlModel from openeo.util import dict_no_none, guess_format if typing.TYPE_CHECKING: @@ -42,11 +47,81 @@ def process( :param process_id: process id of the process. :param args: argument dictionary for the process. - :return: new DataCube instance + :return: new VectorCube instance """ pg = self._build_pgnode(process_id=process_id, arguments=arguments, namespace=namespace, **kwargs) return VectorCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata) + @classmethod + @openeo_process + def load_geojson( + cls, + connection: "openeo.Connection", + data: Union[dict, str, pathlib.Path, shapely.geometry.base.BaseGeometry, Parameter], + properties: Optional[List[str]] = None, + ) -> "VectorCube": + """ + Converts GeoJSON data as defined by RFC 7946 into a vector data cube. + + :param connection: the connection to use to connect with the openEO back-end. + :param data: the geometry to load. One of: + + - GeoJSON-style data structure: e.g. a dictionary with ``"type": "Polygon"`` and ``"coordinates"`` fields + - a path to a local GeoJSON file + - a GeoJSON string + - a shapely geometry object + + :param properties: A list of properties from the GeoJSON file to construct an additional dimension from. + :return: new VectorCube instance + + .. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change. + + .. versionadded:: 0.22.0 + """ + # TODO: unify with `DataCube._get_geometry_argument` + # TODO: also support client side fetching of GeoJSON from URL? + if isinstance(data, str) and data.strip().startswith("{"): + # Assume JSON dump + geometry = json.loads(data) + elif isinstance(data, (str, pathlib.Path)): + # Assume local file + with pathlib.Path(data).open(mode="r", encoding="utf-8") as f: + geometry = json.load(f) + assert isinstance(geometry, dict) + elif isinstance(data, shapely.geometry.base.BaseGeometry): + geometry = shapely.geometry.mapping(data) + elif isinstance(data, Parameter): + geometry = data + elif isinstance(data, dict): + geometry = data + else: + raise ValueError(data) + + pg = PGNode(process_id="load_geojson", data=geometry, properties=properties or []) + return cls(graph=pg, connection=connection) + + @classmethod + @openeo_process + def load_url( + cls, connection: "openeo.Connection", url: str, format: str, options: Optional[dict] = None + ) -> "VectorCube": + """ + Loads a file from a URL + + :param connection: the connection to use to connect with the openEO back-end. + :param url: The URL to read from. Authentication details such as API keys or tokens may need to be included in the URL. + :param format: The file format to use when loading the data. + :param options: The file format parameters to use when reading the data. + Must correspond to the parameters that the server reports as supported parameters for the chosen ``format`` + :return: new VectorCube instance + + .. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change. + + .. versionadded:: 0.22.0 + """ + pg = PGNode(process_id="load_url", arguments=dict_no_none(url=url, format=format, options=options)) + return cls(graph=pg, connection=connection) + @openeo_process def run_udf( self, diff --git a/openeo/udf/feature_collection.py b/openeo/udf/feature_collection.py index ed30c554d..df3fbc222 100644 --- a/openeo/udf/feature_collection.py +++ b/openeo/udf/feature_collection.py @@ -40,8 +40,10 @@ def __init__( for each spatial x,y slice, if no end times are defined, then time instances are assumed not intervals """ + # TODO #455 `id` is first and a required argument, but it's unclear what it can/should be used for. Can we eliminate it? self.id = id self._data = data + # TODO #455 why not include these datetimes directly in the dataframe? self._start_times = self._as_datetimeindex(start_times, expected_length=len(self.data)) self._end_times = self._as_datetimeindex(end_times, expected_length=len(self.data)) diff --git a/openeo/udf/udf_data.py b/openeo/udf/udf_data.py index b26cd16e8..8ea798665 100644 --- a/openeo/udf/udf_data.py +++ b/openeo/udf/udf_data.py @@ -32,7 +32,7 @@ def __init__( The constructor of the UDF argument class that stores all data required by the user defined function. - :param proj: A dictionary of form {"proj type string": "projection description"} i. e. {"EPSG":4326} + :param proj: A dictionary of form {"proj type string": "projection description"} e.g. {"EPSG": 4326} :param datacube_list: A list of data cube objects :param feature_collection_list: A list of VectorTile objects :param structured_data_list: A list of structured data objects diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index f151561c5..952b40ef6 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,4 +1,7 @@ import json +import multiprocessing +import platform +import threading from unittest import mock # TODO: can we avoid using httpretty? @@ -116,6 +119,138 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() + def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock): + """Make sure the MultiBackendJobManager does not hang after all processes finish. + + Regression test for: + https://github.com/Open-EO/openeo-python-client/issues/432 + + Cause was that the run_jobs had an infinite loop when jobs ended with status error. + """ + + requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) + requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) + + def mock_job_status(job_id, succeeds: bool): + """Mock job status polling sequence. + We set up one job with finishes with status error + """ + response_list = sum( + [ + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "queued", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "running", + } + } + ], + [ + { + "json": { + "id": job_id, + "title": f"Job {job_id}", + "status": "finished" if succeeds else "error", + } + } + ], + ], + [], + ) + for backend in ["http://foo.test", "http://bar.test"]: + requests_mock.get(f"{backend}/jobs/{job_id}", response_list) + # It also needs job results endpoint for succesful jobs and the + # log endpoint for a failed job. Both are dummy implementations. + # When the job is finished the system tries to download the + # results or the logs and that is what needs these endpoints. + if succeeds: + requests_mock.get(f"{backend}/jobs/{job_id}/results", json={"links": []}) + else: + response = { + "level": "error", + "logs": [ + { + "id": "1", + "code": "SampleError", + "level": "error", + "message": "Error for testing", + "time": "2019-08-24T14:15:22Z", + "data": None, + "path": [], + "usage": {}, + "links": [], + } + ], + "links": [], + } + requests_mock.get(f"{backend}/jobs/{job_id}/logs?level=error", json=response) + + mock_job_status("job-2018", succeeds=True) + mock_job_status("job-2019", succeeds=True) + mock_job_status("job-2020", succeeds=True) + mock_job_status("job-2021", succeeds=True) + mock_job_status("job-2022", succeeds=False) + + root_dir = tmp_path / "job_mgr_root" + manager = MultiBackendJobManager(root_dir=root_dir) + + manager.add_backend("foo", connection=openeo.connect("http://foo.test")) + manager.add_backend("bar", connection=openeo.connect("http://bar.test")) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = row["year"] + return BatchJob(job_id=f"job-{year}", connection=connection) + + is_done_file = tmp_path / "is_done.txt" + + def start_worker_thread(): + manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + is_done_file.write_text("Done!") + + thread = threading.Thread(target=start_worker_thread, name="Worker process", daemon=True) + + timeout_sec = 5.0 + thread.start() + # We stop waiting for the process after the timeout. + # If that happens it is likely we detected that run_jobs will loop infinitely. + thread.join(timeout=timeout_sec) + + assert is_done_file.exists(), ( + "MultiBackendJobManager did not finish on its own and was killed. " + "Infinite loop is probable." + ) + + # Also check that we got sensible end results. + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished", "error"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved for a successful job, + # so verify that it exists. + # Checking it for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2021") + assert metadata_path.exists() + + def test_on_error_log(self, tmp_path, requests_mock): backend = "http://foo.test" requests_mock.get(backend, json={"api_version": "1.1.0"}) diff --git a/tests/rest/conftest.py b/tests/rest/conftest.py index ffe27ba20..7e3df403f 100644 --- a/tests/rest/conftest.py +++ b/tests/rest/conftest.py @@ -8,6 +8,7 @@ import pytest import time_machine +from openeo.rest._testing import DummyBackend from openeo.rest.connection import Connection API_URL = "https://oeo.test/" @@ -71,8 +72,20 @@ def assert_oidc_device_code_flow(url: str = "https://oidc.test/dc", elapsed: flo return assert_oidc_device_code_flow +@pytest.fixture +def con100(requests_mock): + requests_mock.get(API_URL, json={"api_version": "1.0.0"}) + con = Connection(API_URL) + return con + + @pytest.fixture def con120(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.2.0"}) con = Connection(API_URL) return con + + +@pytest.fixture +def dummy_backend(requests_mock, con100) -> DummyBackend: + yield DummyBackend(requests_mock=requests_mock, connection=con100) diff --git a/tests/rest/datacube/test_vectorcube.py b/tests/rest/datacube/test_vectorcube.py index c89ec09da..5f5813dc6 100644 --- a/tests/rest/datacube/test_vectorcube.py +++ b/tests/rest/datacube/test_vectorcube.py @@ -1,10 +1,11 @@ -import re from pathlib import Path import pytest +import shapely.geometry -from openeo import Connection +from openeo.api.process import Parameter from openeo.internal.graph_building import PGNode +from openeo.rest._testing import DummyBackend from openeo.rest.vectorcube import VectorCube @@ -14,103 +15,6 @@ def vector_cube(con100) -> VectorCube: return VectorCube(graph=pgnode, connection=con100) -class DummyBackend: - """ - Dummy backend that handles sync/batch execution requests - and allows inspection of posted process graphs - """ - - def __init__(self, requests_mock, connection: Connection): - self.connection = connection - self.sync_requests = [] - self.batch_jobs = {} - self.next_result = b"Result data" - requests_mock.post(connection.build_url("/result"), content=self._handle_post_result) - requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs) - requests_mock.post( - re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results - ) - requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job) - requests_mock.get( - re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results - ) - requests_mock.get( - re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), - content=self._handle_get_job_result_asset, - ) - - def _handle_post_result(self, request, context): - """handler of `POST /result` (synchronous execute)""" - pg = request.json()["process"]["process_graph"] - self.sync_requests.append(pg) - return self.next_result - - def _handle_post_jobs(self, request, context): - """handler of `POST /jobs` (create batch job)""" - pg = request.json()["process"]["process_graph"] - job_id = f"job-{len(self.batch_jobs):03d}" - self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} - context.status_code = 201 - context.headers["openeo-identifier"] = job_id - - def _get_job_id(self, request) -> str: - match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path) - if not match: - raise ValueError(f"Failed to extract job_id from {request.path}") - job_id = match.group(1) - assert job_id in self.batch_jobs - return job_id - - def _handle_post_job_results(self, request, context): - """Handler of `POST /job/{job_id}/results` (start batch job).""" - job_id = self._get_job_id(request) - assert self.batch_jobs[job_id]["status"] == "created" - # TODO: support custom status sequence (instead of directly going to status "finished")? - self.batch_jobs[job_id]["status"] = "finished" - context.status_code = 202 - - def _handle_get_job(self, request, context): - """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" - job_id = self._get_job_id(request) - return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} - - def _handle_get_job_results(self, request, context): - """Handler of `GET /job/{job_id}/results` (list batch job results).""" - job_id = self._get_job_id(request) - assert self.batch_jobs[job_id]["status"] == "finished" - return { - "id": job_id, - "assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}}, - } - - def _handle_get_job_result_asset(self, request, context): - """Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset).""" - job_id = self._get_job_id(request) - assert self.batch_jobs[job_id]["status"] == "finished" - return self.next_result - - def get_sync_pg(self) -> dict: - """Get one and only synchronous process graph""" - assert len(self.sync_requests) == 1 - return self.sync_requests[0] - - def get_batch_pg(self) -> dict: - """Get one and only batch process graph""" - assert len(self.batch_jobs) == 1 - return self.batch_jobs[max(self.batch_jobs.keys())]["pg"] - - def get_pg(self) -> dict: - """Get one and only batch process graph (sync or batch)""" - pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()] - assert len(pgs) == 1 - return pgs[0] - - -@pytest.fixture -def dummy_backend(requests_mock, con100) -> DummyBackend: - yield DummyBackend(requests_mock=requests_mock, connection=con100) - - def test_raster_to_vector(con100): img = con100.load_collection("S2") vector_cube = img.raster_to_vector() @@ -175,7 +79,8 @@ def test_download_auto_save_result_only_file( "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT @pytest.mark.parametrize( @@ -216,7 +121,7 @@ def test_download_auto_save_result_with_format( "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT @pytest.mark.parametrize("exec_mode", ["sync", "batch"]) @@ -244,7 +149,7 @@ def test_download_auto_save_result_with_options(vector_cube, dummy_backend, tmp_ "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT @pytest.mark.parametrize( @@ -278,4 +183,73 @@ def test_save_result_and_download( "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT + + +@pytest.mark.parametrize( + "data", + [ + {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + """{"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}""", + shapely.geometry.Polygon([[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]), + ], +) +def test_load_geojson_basic(con100, data, dummy_backend): + vc = VectorCube.load_geojson(connection=con100, data=data) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": { + "data": {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + "properties": [], + }, + "result": True, + } + } + + +@pytest.mark.parametrize("path_type", [str, Path]) +def test_load_geojson_path(con100, dummy_backend, tmp_path, path_type): + path = tmp_path / "geometry.json" + path.write_text("""{"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}""") + vc = VectorCube.load_geojson(connection=con100, data=path_type(path)) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": { + "data": {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + "properties": [], + }, + "result": True, + } + } + + +def test_load_geojson_parameter(con100, dummy_backend): + vc = VectorCube.load_geojson(connection=con100, data=Parameter.datacube()) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": {"data": {"from_parameter": "data"}, "properties": []}, + "result": True, + } + } + + +def test_load_url(con100, dummy_backend): + vc = VectorCube.load_url(connection=con100, url="https://example.com/geometry.json", format="GeoJSON") + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadurl1": { + "process_id": "load_url", + "arguments": {"url": "https://example.com/geometry.json", "format": "GeoJSON"}, + "result": True, + } + } diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 942abb354..8dce57086 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -12,6 +12,7 @@ import pytest import requests.auth import requests_mock +import shapely.geometry import openeo from openeo.capabilities import ApiVersionException, ComparableVersion @@ -21,7 +22,15 @@ from openeo.rest.auth.auth import BearerAuth, NullAuth from openeo.rest.auth.oidc import OidcException from openeo.rest.auth.testing import ABSENT, OidcMock -from openeo.rest.connection import Connection, RestApiConnection, connect, paginate +from openeo.rest.connection import ( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, + Connection, + RestApiConnection, + connect, + paginate, +) +from openeo.rest.vectorcube import VectorCube from openeo.util import ContextTimer from .. import load_json_resource @@ -266,7 +275,7 @@ def test_connection_with_session(): conn = Connection("https://oeo.test/", session=session) assert conn.capabilities().capabilities["foo"] == "bar" session.request.assert_any_call( - url="https://oeo.test/", method="get", headers=mock.ANY, stream=mock.ANY, auth=mock.ANY, timeout=None + url="https://oeo.test/", method="get", headers=mock.ANY, stream=mock.ANY, auth=mock.ANY, timeout=DEFAULT_TIMEOUT ) @@ -278,7 +287,7 @@ def test_connect_with_session(): conn = connect("https://oeo.test/", session=session) assert conn.capabilities().capabilities["foo"] == "bar" session.request.assert_any_call( - url="https://oeo.test/", method="get", headers=mock.ANY, stream=mock.ANY, auth=mock.ANY, timeout=None + url="https://oeo.test/", method="get", headers=mock.ANY, stream=mock.ANY, auth=mock.ANY, timeout=DEFAULT_TIMEOUT ) @@ -2352,6 +2361,47 @@ def test_extents(self, con120): } +@pytest.mark.parametrize( + "data", + [ + {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + """{"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}""", + shapely.geometry.Polygon([[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]), + ], +) +def test_load_geojson(con100, data, dummy_backend): + vc = con100.load_geojson(data) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": { + "data": {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + "properties": [], + }, + "result": True, + } + } + + +def test_load_url(con100, dummy_backend, requests_mock): + file_formats = { + "input": {"GeoJSON": {"gis_data_type": ["vector"]}}, + } + requests_mock.get(API_URL + "file_formats", json=file_formats) + vc = con100.load_url("https://example.com/geometry.json", format="GeoJSON") + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadurl1": { + "process_id": "load_url", + "arguments": {"url": "https://example.com/geometry.json", "format": "GeoJSON"}, + "result": True, + } + } + + def test_list_file_formats(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) conn = Connection(API_URL) @@ -2515,7 +2565,7 @@ def test_default_timeout_default(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) requests_mock.get("/foo", text=lambda req, ctx: repr(req.timeout)) conn = connect(API_URL) - assert conn.get("/foo").text == 'None' + assert conn.get("/foo").text == str(DEFAULT_TIMEOUT) assert conn.get("/foo", timeout=5).text == '5' @@ -2532,6 +2582,32 @@ def flat_graph(self) -> typing.Dict[str, dict]: return {"foo1": {"process_id": "foo"}} +@pytest.mark.parametrize( + "pg", + [ + {"foo1": {"process_id": "foo"}}, + {"process_graph": {"foo1": {"process_id": "foo"}}}, + DummyFlatGraphable(), + ], +) +def test_download_100(requests_mock, pg): + requests_mock.get(API_URL, json={"api_version": "1.0.0"}) + conn = Connection(API_URL) + with mock.patch.object(conn, "request") as request: + conn.download(pg) + assert request.call_args_list == [ + mock.call( + "post", + path="/result", + allow_redirects=False, + stream=True, + expected_status=200, + json={"process": {"process_graph": {"foo1": {"process_id": "foo"}}}}, + timeout=DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, + ) + ] + + @pytest.mark.parametrize( "pg", [ @@ -2547,8 +2623,12 @@ def test_execute_100(requests_mock, pg): conn.execute(pg) assert request.call_args_list == [ mock.call( - "post", path="/result", allow_redirects=False, expected_status=200, - json={"process": {"process_graph": {"foo1": {"process_id": "foo"}}}} + "post", + path="/result", + allow_redirects=False, + expected_status=200, + json={"process": {"process_graph": {"foo1": {"process_id": "foo"}}}}, + timeout=DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, ) ]