Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SatVu backend #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from api.backends.fake_backend import FakeBackend
from api.backends.planet_backend import PlanetBackend
from api.backends.umbra_backend import UmbraBackend
from api.backends.satvu_backend import SatVuBackend

BACKENDS: dict[str, Backend] = {
"fake": FakeBackend(), # type: ignore
"earthsearch": EarthSearchBackend(), # type: ignore
"blacksky": BlackskyBackend(), # type: ignore
"planet": PlanetBackend(), # type: ignore
"umbra": UmbraBackend(), # type: ignore
"satvu": SatVuBackend(),
}
231 changes: 231 additions & 0 deletions api/backends/satvu_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from anyio import sleep
from enum import Enum
from logging import getLogger
from os import getenv
from urllib.parse import urljoin
from typing import Literal

from geojson_pydantic import Point, Feature
from pydantic import BaseModel
from pystac import ProviderRole
from requests import Session, Request
from requests.auth import AuthBase
from requests.adapters import HTTPAdapter, Retry, BaseAdapter
from starlette import status
from stac_pydantic.links import Link

from api.models import Opportunity, Order, Product, Provider

"""
http \
post \
localhost:8000/opportunities \
backend:satvu \
Authorization:"Bearer ${TOKEN}" \
geometry:='{"type": "Point", "coordinates": [12, 52]}' \
product_id="standard-scene" datetime="2024-01-20T00:00:00/2024-02-29T00:00:00"
"""

SATVU_CONTRACT_ID_ENV = "SATVU_CONTRACT_ID"

logger = getLogger(__name__)


class FeasibilityRequestStatus(str, Enum):
pending = "pending"
processing = "processing"
failed = "failed"
feasible = "feasible"
not_feasible = "not feasible"


class StandardScene(Product):
id = "standard-scene"
title = "Standard scene"
description = "Standard tasking product"
license = "proprietary"
links: list[Link] = (
[
Link(
href="https://docs.satellitevu.com/guides/tasking/",
title="Tasking guide",
rel="docs",
type="text/html",
),
],
)
providers: list[Provider] = [
Provider(
name="SatVu",
description="",
roles=[
ProviderRole.LICENSOR,
ProviderRole.PROCESSOR,
ProviderRole.PRODUCER,
ProviderRole.HOST,
],
url="https://satellitevu.com",
)
]


class RequestPayloadProperties(BaseModel):
datetime: str


class RequestPayload(Feature[Point, RequestPayloadProperties]):
type: Literal["Feature"]
geometry: Point

@classmethod
def from_opportunity(cls, opportunity: Opportunity) -> "RequestPayload":
if opportunity.geometry.type != "Point":
raise RuntimeError("only POINT geometries are supported")

return RequestPayload(
type="Feature",
geometry=opportunity.geometry,
properties=RequestPayloadProperties(
datetime=opportunity.datetime,
**(opportunity.constraints or {}),
),
)


class ResponsePayload(BaseModel):
links: list[Link]


class BearerTokenAuth(AuthBase):
token: str

def __init__(self, token: str):
self.token = token

def __call__(self, request: Request):
request.headers["Authorization"] = f"Bearer {self.token}"
return request


def new_session(token: str) -> Session:
retries = Retry(
total=5,
backoff_factor=0.1,
status_forcelist=[
status.HTTP_500_INTERNAL_SERVER_ERROR,
status.HTTP_502_BAD_GATEWAY,
status.HTTP_503_SERVICE_UNAVAILABLE,
status.HTTP_504_GATEWAY_TIMEOUT,
],
)

session = Session()
session.auth = BearerTokenAuth(token)

session.mount("http://", BaseAdapter)
session.mount("https://", HTTPAdapter(max_retries=retries))

return session


class SatVuBackend:
BASE_URL = "https://api.qa.satellitevu.com/otm/v2/"
_contract_id: str

ASYNC_MAX_POLL_WAIT = 60
ASYNC_POLL_WAIT = 0.5

def __init__(self) -> None:
self._contract_id = getenv(SATVU_CONTRACT_ID_ENV)

async def find_products(
self,
token: str,
) -> list[Product]:
"""Get a list of all Products"""
return [StandardScene]

async def place_order(
self,
search: Opportunity,
token: str,
) -> Order:
"""Given an Opportunity, place an order"""
return NotImplemented

async def find_opportunities(
self,
search: Opportunity,
token: str,
) -> list[Opportunity]:
"""Given an Opportunity, get a list of Opportunites that fulfill it"""
if not self._contract_id:
raise RuntimeError(f"{SATVU_CONTRACT_ID_ENV} is not set.")

session = new_session(token)

try:
# SatVu's OTM service is async, so...
# 1. Send feasibility request...
payload = RequestPayload.from_opportunity(search)
url = urljoin(
self.BASE_URL, f"./{self._contract_id}/tasking/feasibilities/"
)
data = payload.dict(exclude_unset=True)
logger.debug(
{
"message": "sending feasibility request",
"url": url,
"payload": data,
}
)

res = session.post(url, json=data)
res.raise_for_status()

data = res.json()
logger.debug(
{
"message": "received feasibility request",
"status code": res.status_code,
"payload": data,
}
)
item = ResponsePayload(**data)
response_url = next(
(link for link in item.links if link.rel == "self")
).href

# 2. Poll feasibility request status...
# SatVu's standard scene product opportunity is just a boolean basically
for _ in range(int(self.ASYNC_MAX_POLL_WAIT / self.ASYNC_POLL_WAIT)):
res = session.get(response_url)
res.raise_for_status()
data = res.json()
logger.debug(
{
"message": "polling...",
"status code": res.status_code,
"payload": data,
}
)

status = data["properties"]["status"]
match status:
case FeasibilityRequestStatus.feasible:
search.id = data["id"]
return [search]
case FeasibilityRequestStatus.not_feasible:
return []
case FeasibilityRequestStatus.failed:
raise RuntimeError("Opportunity request failed")
await sleep(self.ASYNC_POLL_WAIT)
except Exception as e:
logger.exception(
{
"message": "OTM request failed",
}
)
raise e

raise RuntimeError("Opportunity request timed out")
5 changes: 5 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from datetime import datetime, timedelta
from logging import basicConfig
from functools import wraps
from typing import Tuple

Expand All @@ -19,6 +20,10 @@

DEFAULT_BACKEND = os.environ.get("DEFAULT_BACKEND", "earthsearch")

log_level = os.getenv("LOG_LEVEL")
if log_level is not None:
basicConfig(level=log_level.upper())


def throw(func):
@wraps(func)
Expand Down
3 changes: 2 additions & 1 deletion api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class Opportunity(OpportunityProperties):

def to_feature(self) -> OpportunityFeature:
return OpportunityFeature(
type="Feature",
id=self.id,
geometry=self.geometry,
properties=OpportunityProperties(
Expand Down Expand Up @@ -172,4 +173,4 @@ def to_json(self, **kwargs: Any) -> str:
def from_opportunities(
cls, opportunities: list[Opportunity]
) -> "OpportunityCollection":
return cls(features=[o.to_feature() for o in opportunities])
return cls(type="FeatureCollection", features=[o.to_feature() for o in opportunities])