Skip to content

Commit

Permalink
More updates
Browse files Browse the repository at this point in the history
  • Loading branch information
hectormachin committed Sep 1, 2023
1 parent 48505d5 commit 6ca9ad2
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ RUN pip install --no-cache-dir -r requirements.txt

COPY task.py .

CMD ["python3", "./task.py", "run"]
ENTRYPOINT ["python3", "./task.py"]
3 changes: 2 additions & 1 deletion input.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@
]
},
"publish": {
"public": false
"public": false,
"stac_validate": true
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
stactask==0.1.0
stac-validator
stac-validator==3.3.1
208 changes: 115 additions & 93 deletions task.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,39 @@
#!/usr/bin/env python
import boto3
import json
import logging
import os
from boto3utils import s3
from datetime import datetime, timezone
from dateutil.parser import parse as dateparse
from stactask import Task
from stactask.exceptions import InvalidInput
from stac_validator import stac_validator
from string import Formatter, Template
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple


s3_client = s3(requester_pays=False)

# Environment variables from the container
DATA_BUCKET = os.getenv("SWOOP_DATA_BUCKET")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_SESSION_TOKEN = os.getenv("AWS_SESSION_TOKEN")
AWS_DEFAULT_REGION = os.getenv("AWS_DEFAULT_REGION")

session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
region_name=AWS_DEFAULT_REGION,
)

s3session = s3(session, requester_pays=False)


class Publish(Task):
name = "publish"
description = "Publishes an input payload to S3."
version = "0.1.0"

@classmethod
def validate(cls, payload: dict[str, Any]) -> bool:
if "publish" not in payload["process"]["tasks"]:
raise InvalidInput(
"Publish needs to be specified as a task in the input payload"
)
return True

def get_path(item: dict, template: str = "${collection}/${id}") -> str:
def get_path(self, item: dict, template: str = "${collection}/${id}") -> str:
"""Get path name based on STAC Item and template string
Args:
Expand Down Expand Up @@ -67,104 +64,129 @@ def get_path(item: dict, template: str = "${collection}/${id}") -> str:
subs[key] = item["properties"][key.replace("__colon__", ":")]
return Template(_template).substitute(**subs).replace("__colon__", ":")

def publish_items_to_s3(payload, bucket, public) -> Dict:
opts = payload.get("process", {}).get("upload_options", {})
for item in payload["features"]:
# determine URL of data bucket to publish to- always do this
url = os.path.join(
Publish.get_path(item, opts.get("path_template")), f"{item['id']}.json"
)

if url[0:5] != "s3://":
url = f"s3://{bucket}/{url.lstrip('/')}"
if public:
url = s3.s3_to_https(url)

# add canonical and self links (and remove existing self link if present)
item["links"] = [
link
for link in item["links"]
if link["rel"] not in ["self", "canonical"]
]
item["links"].insert(
0, {"rel": "canonical", "href": url, "type": "application/json"}
)
item["links"].insert(
0, {"rel": "self", "href": url, "type": "application/json"}
)

# get S3 session
# TO-DO: ADD requester_pays to secret?

session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION_TOKEN,
region_name=AWS_DEFAULT_REGION,
)

s3session = s3(session, requester_pays=False)

# if existing item use created date
now = datetime.now(timezone.utc).isoformat()
created = None
if s3session.exists(url):
old_item = s3session.read_json(url)
created = old_item["properties"].get("created", None)
if created is None:
created = now
item["properties"]["created"] = created
item["properties"]["updated"] = now

# publish to bucket
headers = opts.get("headers", {})

extra = {"ContentType": "application/json"}
extra.update(headers)
s3session.upload_json(item, url, public=public, extra=extra)
logging.info("Published to s3")

return payload

def process(self, public: bool) -> List[Dict[str, Any]]:
def update_links(
self, item: Dict, template: str, bucket: str, public: bool
) -> Tuple[Dict, str]:
"""Updates the links of an item to include self and canonical links.
Args:
item (Dict): A STAC Item.
template (str, optional): Path template using variables referencing Item fields. Defaults to'${collection}/${id}'.
bucket (str): Name of S3 bucket which will be used in the href for the links.
public (bool): Boolean value specifying if the S3 bucket is public or private.
Returns:
Tuple[Dict, str]: A tuple consisting of an updated STAC item and its S3 url.
"""
url = os.path.join(self.get_path(item, template), f"{item['id']}.json")

if url[0:5] != "s3://":
url = f"s3://{bucket}/{url.lstrip('/')}"
if public:
url = s3.s3_to_https(url)

# add canonical and self links (and remove existing self link if present)
item["links"] = [
link for link in item["links"] if link["rel"] not in ["self", "canonical"]
]
item["links"].insert(
0, {"rel": "canonical", "href": url, "type": "application/json"}
)
item["links"].insert(
0, {"rel": "self", "href": url, "type": "application/json"}
)

return item, url

def update_item_dates(self, item: Dict, url: str) -> Dict:
"""Populates an item's 'created' and 'updated' properties by checking to see
if the item already exists on S3.
Args:
item (Dict): A STAC Item.
url (str): Path to the item on S3 after templating its properties into
the path_template parameter
Returns:
Tuple[Dict, str]: A tuple consisting of an updated STAC item and its S3 url.
"""
now = datetime.now(timezone.utc).isoformat()
created = None
if s3session.exists(url):
old_item = s3session.read_json(url)
created = old_item["properties"].get("created", None)
if created is None:
created = now
item["properties"]["created"] = created
item["properties"]["updated"] = now

return item

def publish_item_to_s3(self, item: Dict, url: str, headers: str, public: bool):
"""Publishes an item to S3 at a specified url.
Args:
item (Dict): A STAC Item.
url (str): Path to the item on S3 after templating its properties into
the path_template parameter
headers (str): Headers to include in the request to upload to S3
public (bool): Boolean value specifying if the S3 bucket is public or private.
Returns:
None
"""
extra = {"ContentType": "application/json"}
extra.update(headers)
s3session.upload_json(item, url, public=public, extra=extra)
logging.info("Published to s3")

def process(self, public: bool, stac_validate: bool) -> List[Dict[str, Any]]:
# process method overrides Task
created_items = []
payload = self._payload
item = self.items[0]

# We shouldn't have to mess with the payload or pulling out config options
# once stac-task supports process arrays. When we get a new version of
# stac-task with that support we can clean this code up.

process = (
payload["process"][0]
if isinstance(payload["process"], list)
else payload["process"]
)

upload_options = process.get("upload_options", {})
path_template = upload_options.get("path_template", {})
headers = upload_options.get("headers", {})
config = process.get("tasks", {}).get("publish", {})
public = config.get("public", False)
stac_validate = config.get("stac_validate", True)

items = self.items_as_dicts

try:
logging.debug("Publishing items to S3")

# publish to s3
mod_payload = Publish.publish_items_to_s3(payload, DATA_BUCKET, public)
mod_items = []

except Exception as err:
msg = f"publish: failed publishing output items ({err})"
logging.error(msg, exc_info=True)
raise Exception(msg) from err
stac = stac_validator.StacValidate()

for item in items:
link_item, url = self.update_links(
item, path_template, DATA_BUCKET, public
)

# STAC-validate item in payload before completing
mod_item = self.update_item_dates(link_item, url)

item = mod_payload["features"][0]
mod_items.append(mod_item)
if stac_validate and not stac.validate_dict(mod_item):
raise Exception(
f"STAC Item validation failed. Error: {stac.message[0]['error_message']}."
)
self.publish_item_to_s3(mod_item, url, headers, public)

stac = stac_validator.StacValidate()
valid = stac.validate_dict(item)
return mod_items

if valid:
created_items.append(item)
return created_items
else:
raise Exception(
f"STAC Item validation failed. Error: {stac.message[0]['error_message']}."
)
except Exception as err:
msg = f"publish: failed publishing output items ({err})"
logging.exception(msg)
raise


def handler(event: dict[str, Any], context: dict[str, Any] = {}) -> Task:
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.0.2"
"version": "v0.0.3"
}
8 changes: 2 additions & 6 deletions workflow-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ spec:
{{- $sessionTokenSecretKey := .Values.s3.sessionTokenSecret.key -}}
{{- $dataBucketSecretName := .Values.s3.dataBucketSecret.name -}}
{{- $dataBucketSecretKey := .Values.s3.dataBucketSecret.key -}}
{{- $apiUrlSecretName := .Values.s3.apiUrlSecret.name -}}
{{- $apiUrlSecretKey := .Values.s3.apiUrlSecret.key -}}
{{- $serviceAccountName := .Values.publishTemplateTaskServiceAccountName -}}
# End of global variable mappings
serviceAccountName: {{ $serviceAccountName }}
Expand Down Expand Up @@ -56,14 +54,12 @@ spec:
secretKeyRef:
name: {{ $dataBucketSecretName }}
key: {{ $dataBucketSecretKey }}
{{- end }}

# should pin to a specific version or label here
image: quay.io/element84/publish-stac-task:latest
imagePullPolicy: IfNotPresent
command:
- python3
- ./task.py
args:
- run
- /mnt/input.json
- --output
- /mnt/output.json

0 comments on commit 6ca9ad2

Please sign in to comment.