forked from asciipip/greenbutton-python
-
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
1,177 additions
and
580 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from dataclasses import dataclass, field | ||
from typing import Dict, List, Optional | ||
|
||
from greenbutton_objects.data.atom import ContentType, EntryType, Feed | ||
|
||
|
||
@dataclass | ||
class HRefTreeNode: | ||
uri: str | ||
parent: Optional[str] = None | ||
contentType: type = type(None) | ||
content: List[ContentType] = field(default_factory=list) | ||
children: List[str] = field(default_factory=list) | ||
related: List[str] = field(default_factory=list) | ||
title: str = "" | ||
|
||
|
||
class HRefForest: | ||
def __init__(self) -> None: | ||
self.forest: Dict[str, HRefTreeNode] = {} | ||
|
||
def __ensure_container(self, uri: str) -> None: | ||
if uri not in self.forest: | ||
self.forest[uri] = HRefTreeNode(uri) | ||
|
||
def __link_parents(self) -> "HRefForest": | ||
for node in self.forest.values(): | ||
if node.parent: | ||
parent_node = self.forest.get(node.parent) | ||
if parent_node: | ||
parent_node.children.append(node.uri) | ||
return self | ||
|
||
def __ensure_containers(self) -> "HRefForest": | ||
for key in list(self.forest.keys()): | ||
node = self.forest[key] | ||
if node.parent: | ||
self.__ensure_container(node.parent) | ||
for related_uri in node.related: | ||
self.__ensure_container(related_uri) | ||
return self | ||
|
||
def __add_nodes(self, feed: Feed) -> "HRefForest": | ||
def entry_content_type(entry: EntryType) -> type: | ||
if entry.content and entry.content[0].content: | ||
content_type = type(entry.content[0].content[0]) | ||
else: | ||
content_type = type(None) | ||
return content_type | ||
|
||
for entry in feed.entry: | ||
related = [] | ||
parent = None | ||
uri = "" | ||
|
||
content_type = entry_content_type(entry) | ||
|
||
for link in entry.link: | ||
# Skip links without URIs | ||
if not link.href: | ||
continue | ||
if link.rel == "self": | ||
uri = link.href | ||
elif link.rel == "related": | ||
related.append(link.href) | ||
elif link.rel == "up": | ||
parent = link.href | ||
|
||
title = self.get_entry_title(entry) | ||
|
||
self.forest[uri] = HRefTreeNode( | ||
uri=uri, | ||
title=title, | ||
parent=parent, | ||
related=related, | ||
contentType=content_type, | ||
content=entry.content, | ||
) | ||
|
||
return self | ||
|
||
@staticmethod | ||
def get_entry_title(entry: EntryType) -> str: | ||
if entry.title: | ||
title_parts = [] | ||
for text in entry.title: | ||
if len(text.content) > 0: | ||
title_parts.append(text.content[0]) | ||
return "".join(title_parts) # type: ignore | ||
else: | ||
return "" | ||
|
||
def build(self, feed: Feed) -> "HRefForest": | ||
return self.__add_nodes(feed).__ensure_containers().__link_parents() | ||
|
||
def root_nodes(self) -> List[str]: | ||
return [node.uri for node in self.forest.values() if node.parent is None] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
from dataclasses import dataclass, field | ||
from itertools import chain | ||
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union | ||
|
||
from greenbutton_objects.atom.href_tree import HRefForest, HRefTreeNode | ||
from greenbutton_objects.util import get_first | ||
|
||
|
||
@dataclass | ||
class EntryNode: | ||
title: str | ||
uri: str | ||
content: Sequence[object] | ||
|
||
parent: Optional["EntryNode"] = None | ||
content_type: type = type(None) | ||
children_type: type = type(None) | ||
children: List["EntryNode"] = field(default_factory=list) | ||
# TODO: Should related be a list or dict keyed by type? | ||
related: List["EntryNode"] = field(default_factory=list) | ||
|
||
def infer_children_type(self) -> None: | ||
# We are making a big assumption here that all children are of the same type. | ||
# This is to make things simpler for the consumer who's using the library | ||
if self.children and self.children[0].content: | ||
content_ = self.children[0].content[0] | ||
if content_.content: # type: ignore | ||
self.children_type = content_.content[0].__class__ # type: ignore | ||
else: | ||
self.children_type = type(None) | ||
|
||
def first_content(self) -> Any: | ||
first_node = get_first(self.content) | ||
first_node_content = first_node.content # type: ignore | ||
return get_first(first_node_content) | ||
|
||
def get_related_of_type(self, elements_type: type) -> Iterable["EntryNode"]: | ||
containers = [obj for obj in self.related if obj.children_type is elements_type] | ||
if containers: | ||
elements: Iterable[EntryNode] = chain.from_iterable( | ||
container.children for container in containers | ||
) | ||
else: | ||
elements = [obj for obj in self.related if obj.content_type is elements_type] | ||
return elements | ||
|
||
def safe_get_content(self, content_type: type) -> Union[Any, None]: | ||
obj = get_first(self.get_related_of_type(content_type)) | ||
return obj.first_content() if obj else None | ||
|
||
|
||
class ObjectForest: | ||
def __init__( | ||
self, | ||
) -> None: | ||
self.__roots: List[EntryNode] = [] | ||
|
||
def build(self, href_forest: HRefForest) -> "ObjectForest": | ||
node_cache: Dict[str, EntryNode] = {} | ||
|
||
def add_node(href_node: HRefTreeNode) -> EntryNode: | ||
entry_node = EntryNode( | ||
title=href_node.title, | ||
uri=href_node.uri, | ||
content=href_node.content, | ||
content_type=href_node.contentType, | ||
) | ||
node_cache[href_node.uri] = entry_node | ||
return entry_node | ||
|
||
def build_tree(node_uri: str) -> EntryNode: | ||
if node_uri not in node_cache: | ||
href_node = href_forest.forest[node_uri] | ||
entry_node = add_node(href_node) | ||
|
||
# Children | ||
entry_node.children = [build_tree(child) for child in href_node.children] | ||
for child in entry_node.children: | ||
child.parent = entry_node | ||
entry_node.infer_children_type() | ||
|
||
# Relatives | ||
entry_node.related = [build_tree(child) for child in href_node.related] | ||
|
||
return node_cache[node_uri] | ||
|
||
self.__roots = [build_tree(uri) for uri in href_forest.root_nodes()] | ||
|
||
return self | ||
|
||
@staticmethod | ||
def get_elements_by_type(elements_type: type, source: list[EntryNode]) -> Iterable[EntryNode]: | ||
containers = [obj for obj in source if obj.children_type is elements_type] | ||
if containers: | ||
elements: Iterable[EntryNode] = chain.from_iterable( | ||
container.children for container in containers | ||
) | ||
else: | ||
elements = [obj for obj in source if obj.content_type is elements_type] | ||
return elements | ||
|
||
@property | ||
def roots(self) -> List[EntryNode]: | ||
return self.__roots |
Oops, something went wrong.