Skip to content

Commit

Permalink
CSVGraphStoreRowProcessor ignores the default graph
Browse files Browse the repository at this point in the history
  • Loading branch information
namedgraph committed Oct 17, 2023
1 parent 4bec544 commit a1abef1
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 59 deletions.
3 changes: 1 addition & 2 deletions src/main/java/com/atomgraph/linkeddatahub/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import com.atomgraph.linkeddatahub.server.interceptor.UpdateRequestCleanupInterceptor;
import com.atomgraph.linkeddatahub.server.mapper.auth.oauth2.TokenExpiredExceptionMapper;
import com.atomgraph.linkeddatahub.server.model.impl.Dispatcher;
import com.atomgraph.linkeddatahub.server.model.impl.GraphStoreImpl;
import com.atomgraph.linkeddatahub.server.security.AgentContext;
import com.atomgraph.linkeddatahub.server.security.AuthorizationContext;
import com.atomgraph.linkeddatahub.server.util.MessageBuilder;
Expand Down Expand Up @@ -1282,7 +1281,7 @@ public Map<Integer, Resource> getLengthMap(Map<URI, Resource> apps)
*/
public void submitImport(CSVImport csvImport, com.atomgraph.linkeddatahub.apps.model.Application app, Service service, Service adminService, String baseURI, LinkedDataClient ldc)
{
new ImportExecutor(importThreadPool).start(service, adminService, baseURI, ldc, service.getGraphStoreClient(), GraphStoreImpl.CREATE_GRAPH, csvImport);
new ImportExecutor(importThreadPool).start(service, adminService, baseURI, ldc, service.getGraphStoreClient(), csvImport);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ public ImportExecutor(ExecutorService execService)
* @param appBaseURI application's base URI
* @param ldc Linked Data client
* @param graphStoreClient GSP client
* @param createGraph function that derives graph URI from a document model
*/
public void start(Service service, Service adminService, String appBaseURI, LinkedDataClient ldc, GraphStoreClient graphStoreClient, Function<Model, Resource> createGraph, CSVImport csvImport)
public void start(Service service, Service adminService, String appBaseURI, LinkedDataClient ldc, GraphStoreClient graphStoreClient, CSVImport csvImport)
{
if (csvImport == null) throw new IllegalArgumentException("CSVImport cannot be null");
if (log.isDebugEnabled()) log.debug("Submitting new import to thread pool: {}", csvImport.toString());
Expand All @@ -128,7 +127,7 @@ public void start(Service service, Service adminService, String appBaseURI, Link
Supplier<Response> fileSupplier = new ClientResponseSupplier(ldc, CSV_MEDIA_TYPES, URI.create(csvImport.getFile().getURI()));
// skip validation because it will be done during final POST anyway
CompletableFuture.supplyAsync(fileSupplier, getExecutorService()).thenApplyAsync(getStreamRDFOutputWriter(service, adminService,
graphStoreClient, queryBaseURI, query, createGraph, csvImport), getExecutorService()).
graphStoreClient, queryBaseURI, query, csvImport), getExecutorService()).
thenAcceptAsync(success(service, csvImport, provImport), getExecutorService()).
exceptionally(failure(service, csvImport, provImport));
}
Expand Down Expand Up @@ -311,13 +310,12 @@ protected void appendProvGraph(Resource provImport, DatasetAccessor accessor)
* @param graphStoreClient GSP client
* @param baseURI base URI
* @param query transformation query
* @param createGraph function that derives graph URI from a document model
* @param imp import resource
* @return function
*/
protected Function<Response, CSVGraphStoreOutput> getStreamRDFOutputWriter(Service service, Service adminService, GraphStoreClient graphStoreClient, String baseURI, Query query, Function<Model, Resource> createGraph, CSVImport imp)
protected Function<Response, CSVGraphStoreOutput> getStreamRDFOutputWriter(Service service, Service adminService, GraphStoreClient graphStoreClient, String baseURI, Query query, CSVImport imp)
{
return new CSVGraphStoreOutputWriter(service, adminService, graphStoreClient, baseURI, query, createGraph, imp.getDelimiter());
return new CSVGraphStoreOutputWriter(service, adminService, graphStoreClient, baseURI, query, imp.getDelimiter());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import java.io.Reader;
import java.util.function.Function;
import org.apache.jena.query.Query;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.Resource;

/**
* RDF output stream.
Expand Down Expand Up @@ -53,18 +50,17 @@ public class CSVGraphStoreOutput // extends com.atomgraph.etl.csv.stream.CSVStre
* @param reader CSV reader
* @param base application base URI
* @param query <code>CONSTRUCT</code> transformation query
* @param createGraph function that derives graph URI from a document model
* @param delimiter CSV delimiter
* @param maxCharsPerColumn max number of characters per column
*/
public CSVGraphStoreOutput(Service service, Service adminService, GraphStoreClient graphStoreClient, Reader reader, String base, Query query, Function<Model, Resource> createGraph, char delimiter, Integer maxCharsPerColumn)
public CSVGraphStoreOutput(Service service, Service adminService, GraphStoreClient graphStoreClient, Reader reader, String base, Query query, char delimiter, Integer maxCharsPerColumn)
{
this.base = base;
this.reader = reader;
this.query = query;
this.delimiter = delimiter;
this.maxCharsPerColumn = maxCharsPerColumn;
this.processor = new CSVGraphStoreRowProcessor(service, adminService, graphStoreClient, base, query, createGraph);
this.processor = new CSVGraphStoreRowProcessor(service, adminService, graphStoreClient, base, query);

CsvParserSettings parserSettings = new CsvParserSettings();
parserSettings.setLineSeparatorDetectionEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.core.Response;
import org.apache.jena.query.Query;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +51,6 @@ public class CSVGraphStoreOutputWriter implements Function<Response, CSVGraphSto
private final GraphStoreClient graphStoreClient;
private final String baseURI;
private final Query query;
private final Function<Model, Resource> createGraph;
private final char delimiter;

/**
Expand All @@ -64,17 +61,15 @@ public class CSVGraphStoreOutputWriter implements Function<Response, CSVGraphSto
* @param graphStoreClient GSP client
* @param baseURI base URI
* @param query transformation query
* @param createGraph function that derives graph URI from a document model
* @param delimiter CSV delimiter
*/
public CSVGraphStoreOutputWriter(Service service, Service adminService, GraphStoreClient graphStoreClient, String baseURI, Query query, Function<Model, Resource> createGraph, char delimiter)
public CSVGraphStoreOutputWriter(Service service, Service adminService, GraphStoreClient graphStoreClient, String baseURI, Query query, char delimiter)
{
this.service = service;
this.adminService = adminService;
this.graphStoreClient = graphStoreClient;
this.baseURI = baseURI;
this.query = query;
this.createGraph = createGraph;
this.delimiter = delimiter;
}

Expand All @@ -94,7 +89,7 @@ public CSVGraphStoreOutput apply(Response csvInput)

try (InputStream fis = new FileInputStream(tempFile); Reader reader = new InputStreamReader(fis, StandardCharsets.UTF_8))
{
CSVGraphStoreOutput output = new CSVGraphStoreOutput(getService(), getAdminService(), getGraphStoreClient(), reader, getBaseURI(), getQuery(), getCreateGraph(), getDelimiter(), null);
CSVGraphStoreOutput output = new CSVGraphStoreOutput(getService(), getAdminService(), getGraphStoreClient(), reader, getBaseURI(), getQuery(), getDelimiter(), null);
output.write();
return output;
}
Expand Down Expand Up @@ -170,14 +165,4 @@ public char getDelimiter()
return delimiter;
}

/**
* Returns function that is used to create graph names (URIs).
*
* @return function
*/
public Function<Model, Resource> getCreateGraph()
{
return createGraph;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

import com.atomgraph.core.client.GraphStoreClient;
import com.atomgraph.linkeddatahub.model.Service;
import com.atomgraph.linkeddatahub.server.util.Skolemizer;
import com.univocity.parsers.common.ParsingContext;
import com.univocity.parsers.common.processor.RowProcessor;
import java.util.function.Function;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.core.Response;
import org.apache.jena.atlas.lib.IRILib;
Expand All @@ -45,7 +43,6 @@ public class CSVGraphStoreRowProcessor implements RowProcessor // extends com.at
private final GraphStoreClient graphStoreClient;
private final String base;
private final Query query;
private final Function<Model, Resource> createGraph;
private int subjectCount, tripleCount;

/**
Expand All @@ -56,16 +53,14 @@ public class CSVGraphStoreRowProcessor implements RowProcessor // extends com.at
* @param graphStoreClient the GSP client
* @param base base URI
* @param query transformation query
* @param createGraph function that derives graph URI from a document model
*/
public CSVGraphStoreRowProcessor(Service service, Service adminService, GraphStoreClient graphStoreClient, String base, Query query, Function<Model, Resource> createGraph)
public CSVGraphStoreRowProcessor(Service service, Service adminService, GraphStoreClient graphStoreClient, String base, Query query)
{
this.service = service;
this.adminService = adminService;
this.graphStoreClient = graphStoreClient;
this.base = base;
this.query = query;
this.createGraph = createGraph;
}

@Override
Expand All @@ -79,17 +74,7 @@ public void rowProcessed(String[] row, ParsingContext context)
{
Dataset rowDataset = transformRow(row, context);

// graph name not specified, will be assigned by the server. Exceptions get swallowed by the client! TO-DO: wait for completion
if (!rowDataset.getDefaultModel().isEmpty())
{
String graphUri = getCreateGraph().apply(rowDataset.getDefaultModel()).getURI();
new Skolemizer(graphUri).apply(rowDataset.getDefaultModel());
getGraphStoreClient().add(graphUri, rowDataset.getDefaultModel());

// purge cache entries that include the graph URI
if (getService().getBackendProxy() != null) ban(getService().getClient(), getService().getBackendProxy(), graphUri).close();
if (getAdminService() != null && getAdminService().getBackendProxy() != null) ban(getAdminService().getClient(), getAdminService().getBackendProxy(), graphUri).close();
}
// the default graph is ignored!

rowDataset.listNames().forEachRemaining(graphUri ->
{
Expand Down Expand Up @@ -231,15 +216,4 @@ public int getTripleCount()
return tripleCount;
}

/**
* Returns function that is used to create graph names (URIs).
*
* @return function
*/
public Function<Model, Resource> getCreateGraph()
{
return createGraph;
}


}

0 comments on commit a1abef1

Please sign in to comment.