Skip to content

Commit

Permalink
Merge pull request #1 from eccenca/feature/configuableFixedInputSchem…
Browse files Browse the repository at this point in the history
…a-CMEM-5263

add advanced options to control input schema + better error messages
  • Loading branch information
seebi authored Nov 29, 2023
2 parents 82a4598 + 655ee44 commit 42b059d
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .copier-answers.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Changes here will be overwritten by Copier
_commit: v5.3.2-18-gc45709d
_commit: v6.0.1
_src_path: gh:eccenca/cmem-plugin-template
author_mail: [email protected]
author_name: eccenca GmbH
Expand Down
46 changes: 33 additions & 13 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,37 @@
repos:
- repo: local
hooks:
- id: pylint
name: pylint
entry: poetry run pylint
exclude: '^tests/.*$'
language: system
types: [python]
- repo: local
hooks:
- id: forbidden-files
name: forbidden files
entry: found copier update rejection files; review them and remove them
language: fail
files: "\\.rej$"

- id: ruff
name: check:ruff
entry: task check:ruff
language: python
types_or: [python, pyi]
pass_filenames: false

- id: poetry-check
name: poetry-check
description: run poetry check to validate config
entry: poetry check
language: python
pass_filenames: false
files: ^(.*/)?pyproject\.toml$

- id: poetry-lock
name: poetry-lock
description: run poetry lock to update lock file
entry: poetry lock
language: python
pass_filenames: false
files: ^(.*/)?(poetry\.lock|pyproject\.toml)$

- id: poetry-install
name: poetry-install
description: >
run poetry install to install dependencies from the lock file
entry: poetry install
language: python
pass_filenames: false
stages: [post-checkout, post-merge]
always_run: true

8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/) and this p
...


## [0.6.0] 2023-11-29

### Added

- advanced options to control input schema (type + property)
- better error messages for cases with missing data (no entity, no value)


## [0.5.0] 2023-11-16

### Added
Expand Down
1 change: 1 addition & 0 deletions Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ tasks:
<<: *preparation
deps:
- clean
- poetry:check
cmds:
- poetry build

Expand Down
148 changes: 108 additions & 40 deletions cmem_plugin_yaml/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from collections.abc import Sequence
from pathlib import Path
from tempfile import mkdtemp
from types import SimpleNamespace
from typing import BinaryIO

import yaml
Expand All @@ -27,39 +28,42 @@
from cmem_plugin_base.dataintegration.plugins import PluginLogger, WorkflowPlugin
from cmem_plugin_base.dataintegration.ports import (
FixedNumberOfInputs,
FixedSchemaPort,
FlexibleSchemaPort,
)
from cmem_plugin_base.dataintegration.utils import setup_cmempy_user_access

SOURCE_INPUT = "entities"
SOURCE_CODE = "code"
SOURCE_FILE = "file"
SOURCE_OPTIONS = OrderedDict(
SOURCE = SimpleNamespace()
SOURCE.entities = "entities"
SOURCE.code = "code"
SOURCE.file = "file"
SOURCE.options = OrderedDict(
{
SOURCE_INPUT: f"{SOURCE_INPUT}: "
SOURCE.entities: f"{SOURCE.entities}: "
"Content is parsed from of the input port in a workflow (default).",
SOURCE_CODE: f"{SOURCE_CODE}: " "Content is parsed from the YAML code field below.",
SOURCE_FILE: f"{SOURCE_FILE}: "
"Content is parsed from an uploaded project file resource (advanced option).",
SOURCE.code: f"{SOURCE.code}: " "Content is parsed from the YAML code field below.",
SOURCE.file: f"{SOURCE.file}: "
"Content is parsed from an uploaded project file resource (see advanced options).",
}
)

TARGET_ENTITIES = "entities"
TARGET_JSON_ENTITIES = "json_entities"
TARGET_JSON_DATASET = "json_dataset"
TARGET_OPTIONS = OrderedDict(
TARGET = SimpleNamespace()
TARGET.entities = "entities"
TARGET.json_entities = "json_entities"
TARGET.json_dataset = "json_dataset"
TARGET.options = OrderedDict(
{
TARGET_JSON_ENTITIES: f"{TARGET_JSON_ENTITIES}: "
TARGET.json_entities: f"{TARGET.json_entities}: "
"Parsed structure will be sent as JSON entities to the output port (current default).",
TARGET_JSON_DATASET: f"{TARGET_JSON_DATASET}: "
"Parsed structure will be is saved in a JSON dataset (advanced option).",
TARGET_ENTITIES: f"{TARGET_ENTITIES}: "
TARGET.json_dataset: f"{TARGET.json_dataset}: "
"Parsed structure will be is saved in a JSON dataset (see advanced options).",
TARGET.entities: f"{TARGET.entities}: "
"Parsed structure will be send as entities to the output port "
"(not implemented yet, later default).",
}
)

DEFAULT_YAML = YamlCode(f"# enter your YAML code here (and select '{SOURCE_CODE}' as input mode.")
DEFAULT_YAML = YamlCode(f"# Add your YAML code here (and select '{SOURCE.code}' as input mode).")


@Plugin(
Expand All @@ -71,16 +75,16 @@
parameters=[
PluginParameter(
name="source_mode",
label="Source",
label="Source / Input Mode",
description="",
param_type=ChoiceParameterType(SOURCE_OPTIONS),
param_type=ChoiceParameterType(SOURCE.options),
),
PluginParameter(
name="target_mode",
label="Target",
label="Target / Output Mode",
description="",
param_type=ChoiceParameterType(TARGET_OPTIONS),
default_value=TARGET_JSON_ENTITIES,
param_type=ChoiceParameterType(TARGET.options),
default_value=TARGET.json_entities,
),
PluginParameter(
name="source_code",
Expand All @@ -104,6 +108,20 @@
advanced=True,
default_value="",
),
PluginParameter(
name="input_schema_type",
label="Input Schema Type / Class",
description=f"In case of source mode '{SOURCE.entities}', you can specify the "
"requested input type.",
advanced=True,
),
PluginParameter(
name="input_schema_path",
label="Input Schema Path / Property",
description=f"In case of source mode '{SOURCE.entities}', you can specify the "
"requested input path.",
advanced=True,
),
],
)
class ParseYaml(WorkflowPlugin):
Expand All @@ -116,6 +134,8 @@ class ParseYaml(WorkflowPlugin):
source_code: str
source_file: str
target_dataset: str
input_schema_type: str
input_schema_path: str

inputs: Sequence[Entities]
execution_context: ExecutionContext
Expand All @@ -124,18 +144,22 @@ class ParseYaml(WorkflowPlugin):

def __init__( # noqa: PLR0913
self,
source_mode: str = SOURCE_INPUT,
target_mode: str = TARGET_ENTITIES,
source_mode: str = SOURCE.entities,
target_mode: str = TARGET.entities,
source_code: YamlCode = DEFAULT_YAML,
source_file: str = "",
target_dataset: str = "",
input_schema_type: str = "urn:x-eccenca:yaml-document",
input_schema_path: str = "text",
) -> None:
# pylint: disable=too-many-arguments
self.source_mode = source_mode
self.target_mode = target_mode
self.source_code = str(source_code)
self.source_file = source_file
self.target_dataset = target_dataset
self.input_schema_path = input_schema_path
self.input_schema_type = input_schema_type
self._validate_config()
self._set_ports()

Expand All @@ -151,34 +175,62 @@ def _raise_error(self, message: str) -> None:

def _set_ports(self) -> None:
"""Define input/output ports based on the configuration"""
self.input_ports = FixedNumberOfInputs([])
self.output_port = None

if self.source_mode == SOURCE_INPUT:
self.input_ports = FixedNumberOfInputs([FlexibleSchemaPort()])
if self.target_mode in (TARGET_ENTITIES, TARGET_JSON_ENTITIES):
self.output_port = FlexibleSchemaPort()
match self.source_mode:
case SOURCE.file:
# no input port
self.input_ports = FixedNumberOfInputs([])
case SOURCE.code:
# no input port
self.input_ports = FixedNumberOfInputs([])
case SOURCE.entities:
# single input port with fixed minimal schema
self.input_ports = FixedNumberOfInputs(
[
FixedSchemaPort(
schema=EntitySchema(
type_uri=self.input_schema_type,
paths=[EntityPath(self.input_schema_path)],
)
)
]
)
case _:
raise ValueError(f"Unknown source mode: {self.source_mode}")
match self.target_mode:
case TARGET.entities:
# output port with flexible schema
self.output_port = FlexibleSchemaPort()
case TARGET.json_entities:
# output port with fixed schema
self.output_port = FixedSchemaPort(
schema=EntitySchema(type_uri="json-document", paths=[EntityPath("text")])
)
case TARGET.json_dataset:
# not output port
self.output_port = None
case _:
raise ValueError(f"Unknown target mode: {self.target_mode}")

def _validate_config(self) -> None:
"""Raise value errors on bad configurations"""
if self.source_mode == SOURCE_CODE and str(self.source_code) == "":
if self.source_mode == SOURCE.code and str(self.source_code) == "":
self._raise_error(
f"When using the source mode '{SOURCE_CODE}', "
f"When using the source mode '{SOURCE.code}', "
"you need to enter or paste YAML Source Code in the code field."
)
if self.source_mode == SOURCE_FILE:
if self.source_mode == SOURCE.file:
if self.source_file == "":
self._raise_error(
f"When using the source mode '{SOURCE_FILE}', "
f"When using the source mode '{SOURCE.file}', "
"you need to select a YAML file."
)
if hasattr(self, "execution_context") and not resource_exist(
self.project, self.source_file
):
self._raise_error(f"The file '{self.source_file}' does not exist in the project.")
if self.target_mode == TARGET_JSON_DATASET and self.target_dataset == "":
if self.target_mode == TARGET.json_dataset and self.target_dataset == "":
self._raise_error(
f"When using the target mode '{TARGET_JSON_DATASET}', "
f"When using the target mode '{TARGET.json_dataset}', "
"you need to select a JSON dataset."
)

Expand All @@ -194,9 +246,25 @@ def _get_input_code(self, writer: BinaryIO) -> None:

def _get_input_entities(self, writer: BinaryIO) -> None:
"""Get input YAML from fist path of first entity of first input"""
first_input: Entities = self.inputs[0]
first_entity: Entity = next(first_input.entities)
first_value: str = next(iter(first_entity.values))[0]
try:
first_input: Entities = self.inputs[0]
except IndexError as error:
raise ValueError("Input port not connected.") from error
try:
first_entity: Entity = next(first_input.entities)
except StopIteration as error:
raise ValueError(
"No entity available on input port. "
"Maybe you can re-configure the Input Schema Type / Class in Advanced Options?"
) from error
try:
first_value: str = next(iter(first_entity.values))[0]
except IndexError as error:
raise ValueError(
"No value available in entity. "
"Maybe you can re-configure the input Input Schema Path / Property "
"in Advanced Options?"
) from error
writer.write(first_value.encode("utf-8"))

def _get_input(self) -> Path:
Expand Down
Loading

0 comments on commit 42b059d

Please sign in to comment.