Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaudDauce committed May 23, 2024
1 parent 4b66010 commit 0c66e1e
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 19 deletions.
11 changes: 0 additions & 11 deletions udata/core/dataset/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,6 @@
EUFREQ.NEVER: 'punctual',
}

# Map High Value Datasets URIs to keyword categories
EU_HVD_CATEGORIES = {
"http://data.europa.eu/bna/c_164e0bf5": "Météorologiques",
"http://data.europa.eu/bna/c_a9135398": "Entreprises et propriété d'entreprises",
"http://data.europa.eu/bna/c_ac64a52d": "Géospatiales",
"http://data.europa.eu/bna/c_b79e35eb": "Mobilité",
"http://data.europa.eu/bna/c_dd313021": "Observation de la terre et environnement",
"http://data.europa.eu/bna/c_e1da4e07": "Statistiques"
}


def temporal_to_rdf(daterange, graph=None):
if not daterange:
return
Expand Down
97 changes: 96 additions & 1 deletion udata/harvest/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from flask import current_app
from udata.core.dataservices.models import Dataservice
from voluptuous import MultipleInvalid, RequiredFieldInvalid

from udata.core.dataset.models import HarvestDatasetMetadata
Expand Down Expand Up @@ -156,6 +157,9 @@ def inner_harvest(self):
def inner_process_dataset(self, item: HarvestItem) -> Dataset:
raise NotImplementedError

def inner_process_dataservice(self, item: HarvestItem) -> Dataservice:
raise NotImplementedError

def harvest(self):
log.debug(f'Starting harvesting {self.source.name} ({self.source.url})…')
factory = HarvestJob if self.dryrun else HarvestJob.objects.create
Expand Down Expand Up @@ -243,6 +247,53 @@ def process_dataset(self, remote_id: str, **kwargs) -> bool :

return self.max_items and len(self.job.items) >= self.max_items

def process_dataservice(self, remote_id: str, **kwargs) -> bool :
'''
Return `True` if the parent should stop iterating because we exceed the number
of items to process.
'''
log.debug(f'Processing dataservice {remote_id}…')

# TODO add `type` to `HarvestItem` to differentiate `Dataset` from `Dataservice`
item = HarvestItem(status='started', started=datetime.utcnow(), remote_id=remote_id)
self.job.items.append(item)
self.save_job()

try:
dataservice = self.inner_process_dataservice(item, **kwargs)

dataservice.harvest = self.update_harvest_info(dataservice.harvest, remote_id)
dataservice.archived = None

# TODO: Apply editable mappings

if self.dryrun:
dataservice.validate()
else:
dataservice.save()
item.dataservice = dataservice
item.status = 'done'
except HarvestSkipException as e:
item.status = 'skipped'

log.info(f'Skipped item {item.remote_id} : {safe_unicode(e)}')
item.errors.append(HarvestError(message=safe_unicode(e)))
except HarvestValidationError as e:
item.status = 'failed'

log.info(f'Error validating item {item.remote_id} : {safe_unicode(e)}')
item.errors.append(HarvestError(message=safe_unicode(e)))
except Exception as e:
item.status = 'failed'
log.exception(f'Error while processing {item.remote_id} : {safe_unicode(e)}')

error = HarvestError(message=safe_unicode(e), details=traceback.format_exc())
item.errors.append(error)
finally:
item.ended = datetime.utcnow()
self.save_job()

return self.max_items and len(self.job.items) >= self.max_items

def update_harvest_info(self, harvest: Optional[HarvestDatasetMetadata], remote_id: int):
if not harvest:
Expand Down Expand Up @@ -319,6 +370,50 @@ def get_dataset(self, remote_id):
return Dataset(owner=self.source.owner)

return Dataset()

def get_dataservice(self, remote_id):
'''Get or create a dataservice given its remote ID (and its source)
We first try to match `source_id` to be source domain independent
'''
dataservice = Dataservice.objects(__raw__={
'harvest.remote_id': remote_id,
'$or': [
{'harvest.domain': self.source.domain},
{'harvest.source_id': str(self.source.id)},
],
}).first()

if dataservice:
return dataservice

if self.source.organization:
return Dataservice(organization=self.source.organization)
elif self.source.owner:
return Dataservice(owner=self.source.owner)

return Dataservice()

def get_dataservice(self, remote_id):
'''Get or create a dataservice given its remote ID (and its source)
We first try to match `source_id` to be source domain independent
'''
dataservice = Dataservice.objects(__raw__={
'harvest.remote_id': remote_id,
'$or': [
{'harvest.domain': self.source.domain},
{'harvest.source_id': str(self.source.id)},
],
}).first()

if dataservice:
return dataservice

if self.source.organization:
return Dataservice(organization=self.source.organization)
elif self.source.owner:
return Dataservice(owner=self.source.owner)

return Dataservice()

def validate(self, data, schema):
'''Perform a data validation against a given schema.
Expand Down Expand Up @@ -359,4 +454,4 @@ def validate(self, data, schema):
msg = str(error)
errors.append(msg)
msg = '\n- '.join(['Validation error:'] + errors)
raise HarvestValidationError(msg)
raise HarvestValidationError(msg)
27 changes: 21 additions & 6 deletions udata/harvest/backends/dcat.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import logging

from rdflib import Graph, URIRef
from rdflib import Graph
from rdflib.namespace import RDF
import lxml.etree as ET
import boto3
from flask import current_app
from datetime import date
import json
from typing import List

from udata.core.dataset.models import Dataset
from udata.rdf import (
DCAT, DCT, HYDRA, SPDX, namespace_manager, guess_format, url_from_rdf
)
from udata.core.dataset.rdf import dataset_from_rdf
from udata.core.dataservices.rdf import dataservice_from_rdf
from udata.storage.s3 import store_as_json, get_from_json
from udata.harvest.models import HarvestItem

Expand Down Expand Up @@ -71,8 +69,11 @@ def inner_harvest(self):
lambda page_number, page: self.process_datasets(page_number, page),
)

# TODO call `walk_graph` with `process_dataservices`

graphs = self.walk_graph(
self.source.url,
fmt,
lambda page_number, page: self.process_dataservices(page_number, page),
)
serialized_graphs = [graph.serialize(format=fmt, indent=None) for graph in graphs]

# The official MongoDB document size in 16MB. The default value here is 15MB to account for other fields in the document (and for difference between * 1024 vs * 1000).
Expand Down Expand Up @@ -154,13 +155,27 @@ def process_datasets(self, page_number: int, page: Graph):

if should_stop:
return True

def process_dataservices(self, page_number: int, page: Graph):
for node in page.subjects(RDF.type, DCAT.DataService):
remote_id = page.value(node, DCT.identifier)
should_stop = self.process_dataservice(remote_id, page_number=page_number, page=page, node=node)

if should_stop:
return True

def inner_process_dataset(self, item: HarvestItem, page_number: int, page: Graph, node):
item.kwargs['page_number'] = page_number

dataset = self.get_dataset(item.remote_id)
return dataset_from_rdf(page, dataset, node=node)

def inner_process_dataservice(self, item: HarvestItem, page_number: int, page: Graph, node):
item.kwargs['page_number'] = page_number

dataservice = self.get_dataservice(item.remote_id)
return dataservice_from_rdf(page, dataservice, node=node)

def get_node_from_item(self, graph, item):
for node in graph.subjects(RDF.type, DCAT.Dataset):
if str(graph.value(node, DCT.identifier)) == item.remote_id:
Expand Down
2 changes: 2 additions & 0 deletions udata/harvest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from urllib.parse import urlparse

from udata.core.dataservices.models import Dataservice
from werkzeug.utils import cached_property

from udata.core.dataset.models import HarvestDatasetMetadata
Expand Down Expand Up @@ -53,6 +54,7 @@ class HarvestError(db.EmbeddedDocument):
class HarvestItem(db.EmbeddedDocument):
remote_id = db.StringField()
dataset = db.ReferenceField(Dataset)
dataservice = db.ReferenceField(Dataservice)
status = db.StringField(choices=list(HARVEST_ITEM_STATUS),
default=DEFAULT_HARVEST_ITEM_STATUS, required=True)
created = db.DateTimeField(default=datetime.utcnow, required=True)
Expand Down
11 changes: 10 additions & 1 deletion udata/rdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import re

from flask import request, url_for, abort
from flask import request, url_for, abort, current_app

from rdflib import Graph, Literal, URIRef
from rdflib.resource import Resource as RdfResource
Expand Down Expand Up @@ -101,6 +101,15 @@
# Includes control characters, unicode surrogate characters and unicode end-of-plane non-characters
ILLEGAL_XML_CHARS = '[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]'

# Map High Value Datasets URIs to keyword categories
EU_HVD_CATEGORIES = {
"http://data.europa.eu/bna/c_164e0bf5": "Météorologiques",
"http://data.europa.eu/bna/c_a9135398": "Entreprises et propriété d'entreprises",
"http://data.europa.eu/bna/c_ac64a52d": "Géospatiales",
"http://data.europa.eu/bna/c_b79e35eb": "Mobilité",
"http://data.europa.eu/bna/c_dd313021": "Observation de la terre et environnement",
"http://data.europa.eu/bna/c_e1da4e07": "Statistiques"
}

def guess_format(string):
'''Guess format given an extension or a mime-type'''
Expand Down

0 comments on commit 0c66e1e

Please sign in to comment.