Skip to content

Commit

Permalink
Merge pull request #142 from aodn/bug/5937-run-ardc-services-in-backg…
Browse files Browse the repository at this point in the history
…round

run the fetch tasks in background
  • Loading branch information
utas-raymondng authored Sep 24, 2024
2 parents 9c0073f + be041f7 commit 02b488e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ public void clearOrganisationVocabCache() {
}

protected void indexAllVocabs(List<VocabModel> parameterVocabs,
List<VocabModel> platformVocabs,
List<VocabModel> organisationVocabs) throws IOException {
List<VocabModel> platformVocabs,
List<VocabModel> organisationVocabs) throws IOException {

List<VocabDto> vocabDtos = new ArrayList<>();

Expand Down Expand Up @@ -267,31 +267,43 @@ protected void bulkIndexVocabs(List<VocabDto> vocabs) throws IOException {
log.info("Total documents in index: {} is {}", vocabsIndexName, elasticSearchIndexService.getDocumentsCount(vocabsIndexName));
}

public void populateVocabsData() throws IOException, InterruptedException, ExecutionException {
public void populateVocabsData() {
log.info("Starting async vocab fetching process...");

Callable<List<VocabModel>> parameterVocabs = () -> {
log.info("Fetching parameter vocabs from ARDC");
return ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PARAMETER_VOCAB);
};

Callable<List<VocabModel>> platformVocabs = () -> {
log.info("Fetching platform vocabs from ARDC");
return ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PLATFORM_VOCAB);
};

Callable<List<VocabModel>> organisationVocabs = () -> {
log.info("Fetching organisation vocabs from ARDC");
return ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.ORGANISATION_VOCAB);
};
// Make it execute with threads the same time to speed up the load.
ExecutorService executorService = Executors.newFixedThreadPool(3);

List<Callable<List<VocabModel>>> tasks = List.of(parameterVocabs, platformVocabs, organisationVocabs);
List<Future<List<VocabModel>>> completed = executorService.invokeAll(tasks);
List<Callable<List<VocabModel>>> tasks = List.of(
createVocabFetchTask(VocabApiPaths.PARAMETER_VOCAB, "parameter"),
createVocabFetchTask(VocabApiPaths.PLATFORM_VOCAB, "platform"),
createVocabFetchTask(VocabApiPaths.ORGANISATION_VOCAB, "organisation")
);

log.info("Indexing fetched vocabs to {}", vocabsIndexName);
indexAllVocabs(completed.get(0).get(), completed.get(1).get(), completed.get(2).get());
// Run asynchronously so the app can continue starting
CompletableFuture.runAsync(() -> {
try {
List<Future<List<VocabModel>>> completed = executorService.invokeAll(tasks);
log.info("Indexing fetched vocabs to {}", vocabsIndexName);
indexAllVocabs(completed.get(0).get(), completed.get(1).get(), completed.get(2).get());
} catch (Exception e) {
log.error("Error processing vocabs data", e);
} finally {
executorService.shutdown();
}
});

executorService.shutdown();
log.info("Vocab fetching process started in the background.");
}

private Callable<List<VocabModel>> createVocabFetchTask(VocabApiPaths vocabType, String vocabName) {
return () -> {
try {
log.info("Fetching {} vocabs from ARDC", vocabName);
return ardcVocabService.getVocabTreeFromArdcByType(vocabType);
} catch (Exception e) {
log.error("Error fetching {} vocabs", vocabName, e);
return Collections.emptyList();
}
};
}

}
1 change: 1 addition & 0 deletions indexer/src/main/resources/application-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ geonetwork:
logging:
level:
au.org.aodn.esindexer: DEBUG
au.org.aodn.ardcvocabs: DEBUG

management:
endpoints:
Expand Down

0 comments on commit 02b488e

Please sign in to comment.