Skip to content

Commit

Permalink
Merge pull request #145 from aodn/feature/apply-retry-calling-ardc-en…
Browse files Browse the repository at this point in the history
…dpoints

retry template for rest template
  • Loading branch information
utas-raymondng authored Sep 30, 2024
2 parents 94d5856 + f249ef3 commit 3d52fb0
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 82 deletions.
4 changes: 4 additions & 0 deletions ardcvocabs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
<artifactId>jsonassert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,48 @@
import au.org.aodn.ardcvocabs.service.ArdcVocabServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;

@Slf4j
@AutoConfiguration // More expressive vs @Configuration
@AutoConfiguration
@ConditionalOnMissingBean(ArdcVocabService.class)
@EnableRetry // Enable retry support
public class ArdcAutoConfiguration {

@Bean
public ArdcVocabService createArdcVocabsService(RestTemplate restTemplate) {
public ArdcVocabService createArdcVocabsService(RestTemplate restTemplate, RetryTemplate retryTemplate) {
log.info("Create ArdcVocabsService");
return new ArdcVocabServiceImpl(restTemplate);
return new ArdcVocabServiceImpl(restTemplate, retryTemplate);
}
/**
* In case the one who use this lib have not created it.
* @return RestTemplate
*/

@Bean
@ConditionalOnMissingBean(RestTemplate.class)
public RestTemplate ardcVocabRestTemplate() {
return new RestTemplate();
}

@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();

// Configure retry policy (3 attempts)
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);

// Configure backoff policy (exponential backoff starting at 1 second, doubling each time)
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000); // 1 second
backOffPolicy.setMultiplier(2); // 2x each retry
backOffPolicy.setMaxInterval(5000); // max 5 seconds
retryTemplate.setBackOffPolicy(backOffPolicy);

return retryTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;

Expand All @@ -23,14 +24,16 @@ public class ArdcVocabServiceImpl implements ArdcVocabService {
protected String vocabApiBase;

protected RestTemplate restTemplate;
protected RetryTemplate retryTemplate;

protected Function<JsonNode, String> label = (node) -> node.has("prefLabel") ? node.get("prefLabel").get("_value").asText() : null;
protected Function<JsonNode, String> about = (node) -> node.has("_about") ? node.get("_about").asText() : null;
protected Function<JsonNode, String> definition = (node) -> node.has("definition") ? node.get("definition").asText() : null;
protected BiFunction<JsonNode, String, Boolean> isNodeValid = (node, item) -> node != null && !node.isEmpty() && node.has(item) && !node.get(item).isEmpty();

public ArdcVocabServiceImpl(RestTemplate restTemplate) {
public ArdcVocabServiceImpl(RestTemplate restTemplate, RetryTemplate retryTemplate) {
this.restTemplate = restTemplate;
this.retryTemplate = retryTemplate;
}

protected VocabModel buildVocabByResourceUri(String vocabUri, String vocabApiBase, VocabApiPaths vocabApiPaths) {
Expand All @@ -42,7 +45,7 @@ protected VocabModel buildVocabByResourceUri(String vocabUri, String vocabApiBas

try {
log.debug("Query api -> {}", detailsUrl);
ObjectNode detailsObj = restTemplate.getForObject(detailsUrl, ObjectNode.class);
ObjectNode detailsObj = retryTemplate.execute(context -> restTemplate.getForObject(detailsUrl, ObjectNode.class));
if(isNodeValid.apply(detailsObj, "result") && isNodeValid.apply(detailsObj.get("result"), "primaryTopic")) {
JsonNode target = detailsObj.get("result").get("primaryTopic");

Expand Down Expand Up @@ -106,7 +109,8 @@ protected Map<String, List<VocabModel>> getVocabLeafNodes(String vocabApiBase, V
while (url != null && !url.isEmpty()) {
try {
log.debug("getVocabLeafNodes -> {}", url);
ObjectNode r = restTemplate.getForObject(url, ObjectNode.class);
String finalUrl = url;
ObjectNode r = retryTemplate.execute(context -> restTemplate.getForObject(finalUrl, ObjectNode.class));

if (r != null && !r.isEmpty()) {
JsonNode node = r.get("result");
Expand All @@ -117,8 +121,7 @@ protected Map<String, List<VocabModel>> getVocabLeafNodes(String vocabApiBase, V
String dl = String.format(vocabApiBase + vocabApiPaths.getVocabDetailsApiPath(), about.apply(j));
try {
log.debug("getVocabLeafNodes -> {}", dl);
ObjectNode d = restTemplate.getForObject(dl, ObjectNode.class);

ObjectNode d = retryTemplate.execute(context -> restTemplate.getForObject(dl, ObjectNode.class));
if(isNodeValid.apply(d, "result") && isNodeValid.apply(d.get("result"), "primaryTopic")) {
JsonNode target = d.get("result").get("primaryTopic");

Expand Down Expand Up @@ -203,8 +206,8 @@ public List<VocabModel> getVocabTreeFromArdcByType(VocabApiPaths vocabApiPaths)
while (url != null && !url.isEmpty()) {
try {
log.debug("Query api -> {}", url);
ObjectNode r = restTemplate.getForObject(url, ObjectNode.class);

String finalUrl = url;
ObjectNode r = retryTemplate.execute(context -> restTemplate.getForObject(finalUrl, ObjectNode.class));
if (r != null && !r.isEmpty()) {
JsonNode node = r.get("result");
if (!node.isEmpty() && node.has("items") && !node.get("items").isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestTemplate;

import java.io.FileNotFoundException;
Expand All @@ -26,6 +30,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.test.util.AssertionErrors.assertTrue;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -191,7 +197,7 @@ else if(url.contains("/aodn-organisation-category-vocabulary/version-2-5/resourc
public void init() {
// If you want real download for testing, uncomment below and do not use mock
//this.ardcVocabService = new ArdcVocabServiceImpl(new RestTemplate());
this.ardcVocabService = new ArdcVocabServiceImpl(mockRestTemplate);
this.ardcVocabService = new ArdcVocabServiceImpl(mockRestTemplate, new RetryTemplate());
this.ardcVocabService.vocabApiBase = "https://vocabs.ardc.edu.au/repository/api/lda/aodn";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import au.org.aodn.ardcvocabs.model.VocabApiPaths;
import au.org.aodn.ardcvocabs.model.VocabModel;
import au.org.aodn.ardcvocabs.service.ArdcVocabService;
import au.org.aodn.esindexer.configuration.AppConstants;
import au.org.aodn.esindexer.service.VocabService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -12,7 +11,6 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
public interface VocabService {
List<String> extractVocabLabelsFromThemes(List<ThemesModel> themes, String vocabType) throws IOException;

void populateVocabsData() throws IOException, InterruptedException, ExecutionException;
void populateVocabsData() throws IOException;
void populateVocabsDataAsync();
void clearParameterVocabCache();
void clearPlatformVocabCache();
void clearOrganisationVocabCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,83 +227,105 @@ protected void indexAllVocabs(List<VocabModel> parameterVocabs,
}

protected void bulkIndexVocabs(List<VocabDto> vocabs) throws IOException {
// count portal index documents, or create index if not found from defined mapping JSON file
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (VocabDto vocab : vocabs) {
try {
// convert vocab values to binary data
log.debug("Ingested json is {}", indexerObjectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(vocab));
// send bulk request to Elasticsearch
bulkRequest.operations(op -> op
.index(idx -> idx
.index(vocabsIndexName)
.document(vocab)
)
);
} catch (JsonProcessingException e) {
log.error("Failed to ingest parameterVocabs to {}", vocabsIndexName);
throw new RuntimeException(e);
if (!vocabs.isEmpty()) {
BulkRequest.Builder bulkRequest = new BulkRequest.Builder();

for (VocabDto vocab : vocabs) {
try {
// convert vocab values to binary data
log.debug("Ingested json is {}", indexerObjectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(vocab));
// send bulk request to Elasticsearch
bulkRequest.operations(op -> op
.index(idx -> idx
.index(vocabsIndexName)
.document(vocab)
)
);
} catch (JsonProcessingException e) {
log.error("Failed to ingest parameterVocabs to {}", vocabsIndexName);
throw new RuntimeException(e);
}
}
}

BulkResponse result = portalElasticsearchClient.bulk(bulkRequest.build());
BulkResponse result = portalElasticsearchClient.bulk(bulkRequest.build());

// Flush after insert, otherwise you need to wait for next auto-refresh. It is
// especially a problem with autotest, where assert happens very fast.
portalElasticsearchClient.indices().refresh();
// Flush after insert, otherwise you need to wait for next auto-refresh. It is
// especially a problem with autotest, where assert happens very fast.
portalElasticsearchClient.indices().refresh();

// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error("{} {}", item.error().reason(), item.error().causedBy());
// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
log.error("{} {}", item.error().reason(), item.error().causedBy());
}
}
} else {
log.info("Finished bulk indexing items to index: {}", vocabsIndexName);
}
log.info("Total documents in index: {} is {}", vocabsIndexName, elasticSearchIndexService.getDocumentsCount(vocabsIndexName));
} else {
log.info("Finished bulk indexing items to index: {}", vocabsIndexName);
log.error("No vocabs to be indexed, nothing to index");
}
log.info("Total documents in index: {} is {}", vocabsIndexName, elasticSearchIndexService.getDocumentsCount(vocabsIndexName));
}

public void populateVocabsData() {
log.info("Starting async vocab fetching process...");
public void populateVocabsData() throws IOException {
log.info("Starting fetching vocabs data process synchronously...");

ExecutorService executorService = Executors.newFixedThreadPool(3);
List<VocabModel> parameterVocabs = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PARAMETER_VOCAB);
List<VocabModel> platformVocabs = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.PLATFORM_VOCAB);
List<VocabModel> organisationVocabs = ardcVocabService.getVocabTreeFromArdcByType(VocabApiPaths.ORGANISATION_VOCAB);

indexAllVocabs(parameterVocabs, platformVocabs, organisationVocabs);
}

List<Callable<List<VocabModel>>> tasks = List.of(
public void populateVocabsDataAsync() {
log.info("Starting async vocabs data fetching process...");

ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<List<VocabModel>>> vocabTasks = List.of(
createVocabFetchTask(VocabApiPaths.PARAMETER_VOCAB, "parameter"),
createVocabFetchTask(VocabApiPaths.PLATFORM_VOCAB, "platform"),
createVocabFetchTask(VocabApiPaths.ORGANISATION_VOCAB, "organisation")
);

// Run asynchronously so the app can continue starting
CompletableFuture.runAsync(() -> {
try {
List<Future<List<VocabModel>>> completed = executorService.invokeAll(tasks);
// Invoke all tasks and wait for completion
List<Future<List<VocabModel>>> completedFutures = executorService.invokeAll(vocabTasks);

// Ensure all tasks are completed and check for exceptions
List<List<VocabModel>> allResults = new ArrayList<>();
for (Future<List<VocabModel>> future : completedFutures) {
try {
allResults.add(future.get()); // Blocks until the task is completed and retrieves the result
} catch (Exception taskException) {
log.error("Task failed with an exception", taskException);
// Handle failure for this particular task
allResults.add(Collections.emptyList()); // add empty result for failed task
}
}

// Call indexAllVocabs only after all tasks are completed
log.info("Indexing fetched vocabs to {}", vocabsIndexName);
indexAllVocabs(completed.get(0).get(), completed.get(1).get(), completed.get(2).get());
} catch (Exception e) {
log.error("Error processing vocabs data", e);
indexAllVocabs(allResults.get(0), allResults.get(1), allResults.get(2));

} catch (InterruptedException | IOException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
log.error("Thread was interrupted while processing vocab tasks", e);
} finally {
executorService.shutdown();
}
});

log.info("Vocab fetching process started in the background.");
log.info("Vocabs data fetching process started in the background.");
}

private Callable<List<VocabModel>> createVocabFetchTask(VocabApiPaths vocabType, String vocabName) {
return () -> {
try {
log.info("Fetching {} vocabs from ARDC", vocabName);
return ardcVocabService.getVocabTreeFromArdcByType(vocabType);
} catch (Exception e) {
log.error("Error fetching {} vocabs", vocabName, e);
return Collections.emptyList();
}
log.info("Fetching {} vocabs from ARDC", vocabName);
return ardcVocabService.getVocabTreeFromArdcByType(vocabType);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;

import java.io.IOException;
import java.util.concurrent.ExecutionException;


@Slf4j
Expand All @@ -26,24 +26,26 @@ public void setVocabService(VocabService vocabService) {
}

@PostConstruct
public void init() throws IOException, InterruptedException, ExecutionException {
// this could take a few minutes to complete, in development, you can skip it with -Dapp.initialiseVocabsIndex=false
// you can call /api/v1/indexer/ext/vocabs/populate endpoint to manually refresh the vocabs index, without waiting for the scheduled task
public void init() throws IOException {
// Check if the initialiseVocabsIndex flag is enabled
if (initialiseVocabsIndex) {
log.info("Initialising {}", vocabsIndexName);
vocabService.populateVocabsData();
log.info("Initialising {} asynchronously", vocabsIndexName);
vocabService.populateVocabsDataAsync();
}
}

@Scheduled(cron = "0 0 0 * * *")
public void scheduledRefreshVocabsData() throws IOException, ExecutionException, InterruptedException {
public void scheduledRefreshVocabsData() throws IOException {
log.info("Refreshing ARDC vocabularies data");

// call synchronous populating method, otherwise existing vocab caches will be emptied while new data hasn't been fully processed yet.
vocabService.populateVocabsData();

// clear existing caches
vocabService.clearParameterVocabCache();
vocabService.clearPlatformVocabCache();
vocabService.clearOrganisationVocabCache();
// populate latest vocabs
vocabService.populateVocabsData();

// update the caches
vocabService.getParameterVocabs();
vocabService.getPlatformVocabs();
Expand Down
Loading

0 comments on commit 3d52fb0

Please sign in to comment.