Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial PoC implementation of UDPJobFactory #644

Merged
merged 22 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ec982c2
Issue #604 initial PoC implementation of UDPJobFactory
soxofaan Oct 10, 2024
4e04351
Issue #604/#644 refactor out parse_remote_process_definition as stand…
soxofaan Oct 11, 2024
04bc791
Issue #604/#644 explicitly use default value from schema as fallback
soxofaan Oct 11, 2024
ff8b553
Issue #604/#644 add UDPJobFactory+MultiBackendJobManager tests
soxofaan Oct 11, 2024
64cafcf
Issue #645 introduce returning event stats from MultiBackendJobManage…
soxofaan Oct 11, 2024
04296c1
Issue #604/#644 more UDPJobFactory+MultiBackendJobManager tests
soxofaan Oct 11, 2024
f8db877
Issue #604/#644 UDPJobFactory: improve geometry support
soxofaan Oct 11, 2024
ff9c3f2
Issue #604/#644 test coverage for personal UDP mode
soxofaan Oct 11, 2024
632e239
Issue #604/#644 test coverage for geometry handling after resume
soxofaan Oct 11, 2024
5b1e8fa
MultiBackendJobManager: fix another SettingWithCopyWarning related bug
soxofaan Oct 11, 2024
fd9fdb8
Issue #604/#644 test coverage for resuming: also parquet
soxofaan Oct 11, 2024
ade5258
Issue #604/#644 fix title/description
soxofaan Oct 11, 2024
d29d3ee
Issue #604/#644 changelog entry and usage example
soxofaan Oct 14, 2024
a92e47f
Issue #604/#644 add process_id to docs
soxofaan Oct 14, 2024
84ee8ea
Issue #604/#644 UDPJobFactory: make process_id optional (if namespace…
soxofaan Oct 14, 2024
ddc9ee5
Issue #604/#644 replace lru_cache trick with cleaner cache
soxofaan Oct 14, 2024
e22a791
Issue #604/#644 further documentation finetuning
soxofaan Oct 14, 2024
130db87
Issue #604/#644 add tests for parameter_column_map
soxofaan Oct 14, 2024
2667733
Issue #604/#644 add some todo note for furtherdevelopment ideas
soxofaan Oct 14, 2024
1917a73
Issue #604/#644 finetune docs based on review
soxofaan Oct 16, 2024
8ffa2a6
Issue #604/#644 Rename UDPJobFactory to ProcessBasedJobCreator
soxofaan Oct 16, 2024
7bf73de
Issue #604/#644 move ProcessBasedJobCreator example to more extensive…
soxofaan Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame.
Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension.
([#635](https://github.com/Open-EO/openeo-python-client/issues/635))


- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645))
soxofaan marked this conversation as resolved.
Show resolved Hide resolved
- Added `UDPJobFactory` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604))

### Changed

Expand Down
4 changes: 4 additions & 0 deletions docs/cookbook/job_manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ Multi Backend Job Manager
.. autoclass:: openeo.extra.job_management.CsvJobDatabase

.. autoclass:: openeo.extra.job_management.ParquetJobDatabase

.. autoclass:: openeo.extra.job_management.UDPJobFactory
:members:
:special-members: __call__
260 changes: 249 additions & 11 deletions openeo/extra/job_management.py

Large diffs are not rendered by default.

100 changes: 74 additions & 26 deletions openeo/internal/processes/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
from __future__ import annotations

import json
import re
import typing
from pathlib import Path
from typing import Iterator, List, Union
from typing import Any, Iterator, List, Optional, Union

import requests


class Schema:
class Schema(typing.NamedTuple):
"""Schema description of an openEO process parameter or return value."""

def __init__(self, schema: Union[dict, list]):
self.schema = schema
schema: Union[dict, list]

@classmethod
def from_dict(cls, data: dict) -> Schema:
Expand All @@ -31,33 +31,44 @@ def is_process_graph(self) -> bool:
and self.schema.get("subtype") == "process-graph"
)

def accepts_geojson(self) -> bool:
"""Does this schema accept inline GeoJSON objects?"""

class Parameter:
"""openEO process parameter"""
def is_geojson_schema(schema) -> bool:
return isinstance(schema, dict) and schema.get("type") == "object" and schema.get("subtype") == "geojson"

# TODO unify with openeo.api.process.Parameter?
if isinstance(self.schema, dict):
return is_geojson_schema(self.schema)
elif isinstance(self.schema, list):
return any(is_geojson_schema(s) for s in self.schema)
return False

NO_DEFAULT = object()

def __init__(self, name: str, description: str, schema: Schema, default=NO_DEFAULT, optional: bool = False):
self.name = name
self.description = description
self.schema = schema
self.default = default
self.optional = optional
_NO_DEFAULT = object()


class Parameter(typing.NamedTuple):
"""openEO process parameter"""
# TODO unify with openeo.api.process.Parameter?

name: str
description: str
schema: Schema
default: Any = _NO_DEFAULT
optional: bool = False

@classmethod
def from_dict(cls, data: dict) -> Parameter:
return cls(
name=data["name"],
description=data["description"],
schema=Schema.from_dict(data["schema"]),
default=data.get("default", cls.NO_DEFAULT),
default=data.get("default", _NO_DEFAULT),
optional=data.get("optional", False),
)

def has_default(self):
return self.default is not self.NO_DEFAULT
return self.default is not _NO_DEFAULT


class Returns:
Expand All @@ -73,24 +84,31 @@ def from_dict(cls, data: dict) -> Returns:


class Process(typing.NamedTuple):
"""An openEO process"""

"""
Container for a opneEO process definition of an openEO process,
covering pre-defined processes, user-defined processes,
remote process definitions, etc.
"""

# Common-denominator-wise only the process id is a required field in a process definition.
# Depending on the context in the openEO API, some other fields (e.g. "process_graph")
# may also be required.
id: str
parameters: List[Parameter]
returns: Returns
description: str = ""
summary: str = ""
parameters: Optional[List[Parameter]] = None
returns: Optional[Returns] = None
description: Optional[str] = None
summary: Optional[str] = None
# TODO: more properties?

@classmethod
def from_dict(cls, data: dict) -> Process:
"""Construct openEO process from dictionary values"""
return cls(
id=data["id"],
parameters=[Parameter.from_dict(d) for d in data["parameters"]],
returns=Returns.from_dict(data["returns"]),
description=data["description"],
summary=data["summary"],
parameters=[Parameter.from_dict(d) for d in data["parameters"]] if "parameters" in data else None,
returns=Returns.from_dict(data["returns"]) if "returns" in data else None,
description=data.get("description"),
summary=data.get("summary"),
)

@classmethod
Expand All @@ -114,3 +132,33 @@ def parse_all_from_dir(path: Union[str, Path], pattern="*.json") -> Iterator[Pro
"""Parse all openEO process files in given directory"""
for p in sorted(Path(path).glob(pattern)):
yield Process.from_json_file(p)


def parse_remote_process_definition(namespace: str, process_id: Optional[str] = None) -> Process:
"""
Parse a process definition as defined by the "Remote Process Definition Extension" spec
https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition
"""
if not re.match("https?://", namespace):
raise ValueError(f"Expected absolute URL, but got {namespace!r}")

resp = requests.get(url=namespace)
resp.raise_for_status()
data = resp.json()
assert isinstance(data, dict)

if "id" not in data and "processes" in data and isinstance(data["processes"], list):
# Handle process listing: filter out right process
if not isinstance(process_id, str):
raise ValueError(f"Working with process listing, but got invalid process id {process_id!r}")
processes = [p for p in data["processes"] if p.get("id") == process_id]
if len(processes) != 1:
raise LookupError(f"Process {process_id!r} not found in process listing {namespace!r}")
(data,) = processes

# Some final validation.
assert "id" in data, "Process definition should at least have an 'id' field"
if process_id is not None and data["id"] != process_id:
raise LookupError(f"Expected process id {process_id!r}, but found {data['id']!r}")

return Process.from_dict(data)
48 changes: 43 additions & 5 deletions openeo/rest/_testing.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import collections
import json
import re
from typing import Optional, Union
from typing import Callable, Iterator, Optional, Sequence, Union

from openeo import Connection, DataCube
from openeo.rest.vectorcube import VectorCube

OPENEO_BACKEND = "https://openeo.test/"


class OpeneoTestingException(Exception):
pass
Expand All @@ -23,6 +26,8 @@ class DummyBackend:
"validation_requests",
"next_result",
"next_validation_errors",
"job_status_updater",
"extra_job_metadata_fields",
)

# Default result (can serve both as JSON or binary data)
Expand All @@ -35,6 +40,14 @@ def __init__(self, requests_mock, connection: Connection):
self.validation_requests = []
self.next_result = self.DEFAULT_RESULT
self.next_validation_errors = []
self.extra_job_metadata_fields = []

# Job status update hook:
# callable that is called on starting a job, and getting job metadata
# allows to dynamically change how the status of a job evolves
# By default: immediately set to "finished" once job is started
self.job_status_updater = lambda job_id, current_status: "finished"

requests_mock.post(
connection.build_url("/result"),
content=self._handle_post_result,
Expand Down Expand Up @@ -70,9 +83,13 @@ def _handle_post_result(self, request, context):

def _handle_post_jobs(self, request, context):
"""handler of `POST /jobs` (create batch job)"""
pg = request.json()["process"]["process_graph"]
post_data = request.json()
pg = post_data["process"]["process_graph"]
job_id = f"job-{len(self.batch_jobs):03d}"
self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"}
job_data = {"job_id": job_id, "pg": pg, "status": "created"}
for field in self.extra_job_metadata_fields:
job_data[field] = post_data.get(field)
self.batch_jobs[job_id] = job_data
context.status_code = 201
context.headers["openeo-identifier"] = job_id

Expand All @@ -88,13 +105,19 @@ def _handle_post_job_results(self, request, context):
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "created"
# TODO: support custom status sequence (instead of directly going to status "finished")?
self.batch_jobs[job_id]["status"] = "finished"
self.batch_jobs[job_id]["status"] = self.job_status_updater(
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
)
context.status_code = 202

def _handle_get_job(self, request, context):
"""Handler of `GET /job/{job_id}` (get batch job status and metadata)."""
job_id = self._get_job_id(request)
# Allow updating status with `job_status_setter` once job got past status "created"
if self.batch_jobs[job_id]["status"] != "created":
self.batch_jobs[job_id]["status"] = self.job_status_updater(
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
)
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}

def _handle_get_job_results(self, request, context):
Expand Down Expand Up @@ -160,6 +183,21 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] =
cube.execute()
return self.get_pg(process_id=process_id)

def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"):
"""
Set up simple job status flow:
queued (a couple of times) -> running (a couple of times) -> finished/error.
"""
template = ["queued"] * queued + ["running"] * running + [final]
job_stacks = collections.defaultdict(template.copy)

def get_status(job_id: str, current_status: str) -> str:
stack = job_stacks[job_id]
# Pop first item each time, but repeat the last one at the end
return stack.pop(0) if len(stack) > 1 else stack[0]

self.job_status_updater = get_status


def build_capabilities(
*,
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,7 @@ def execute(

def create_job(
self,
process_graph: Union[dict, str, Path],
process_graph: Union[dict, str, Path, FlatGraphableMixin],
*,
title: Optional[str] = None,
description: Optional[str] = None,
Expand Down
Loading