Skip to content

Commit

Permalink
Merge pull request #26 from alliance-genome/add_queuing
Browse files Browse the repository at this point in the history
Added queuing and reorg
  • Loading branch information
oblodgett authored Aug 31, 2021
2 parents 0a6388b + caf3ea5 commit 150a03e
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 40 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>

<!-- <dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-artemis-jms</artifactId>
</dependency> -->
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jaxb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.alliancegenome.curation_api.bulk.controllers;

import java.util.*;

import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.jms.*;

import org.alliancegenome.curation_api.consumers.GeneDTOConsumer;
import org.alliancegenome.curation_api.interfaces.bulk.GeneBulkRESTInterface;
import org.alliancegenome.curation_api.model.ingest.json.dto.*;
import org.alliancegenome.curation_api.services.GeneService;
import org.alliancegenome.curation_api.util.ProcessDisplayHelper;

import lombok.extern.jbosslog.JBossLog;
Expand All @@ -14,18 +17,19 @@
@RequestScoped
public class GeneBulkController implements GeneBulkRESTInterface {

@Inject GeneService geneService;

@Inject GeneDTOConsumer geneDTOConsumer;
@Override
public String updateBGI(GeneMetaDataDTO geneData) {

ProcessDisplayHelper ph = new ProcessDisplayHelper(10000);
ph.startProcess("Gene Update", geneData.getData().size());
for(GeneDTO gene: geneData.getData()) {
geneService.processUpdate(gene);

for(GeneDTO gene: geneData.getData()) {
geneDTOConsumer.send(gene);
ph.progressProcess();
}

ph.finishProcess();

return "OK";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.alliancegenome.curation_api.consumers;

import java.util.concurrent.*;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.control.ActivateRequestContext;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.*;

import org.alliancegenome.curation_api.model.ingest.json.dto.GeneDTO;
import org.alliancegenome.curation_api.services.GeneService;

import io.quarkus.runtime.*;
import lombok.extern.jbosslog.JBossLog;

@JBossLog
@ApplicationScoped
public class GeneDTOConsumer implements Runnable {

@Inject GeneService geneService;

@Inject ConnectionFactory connectionFactory1;
@Inject ConnectionFactory connectionFactory2;

private JMSProducer producer;
private JMSContext context;

private int threadCount = 4;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(threadCount);

void onStart(@Observes StartupEvent ev) {
log.info("GeneDTOConsumer Starting:" + threadCount);
context = connectionFactory1.createContext(Session.AUTO_ACKNOWLEDGE);
producer = context.createProducer();
for(int i = 0; i < threadCount; i++) {
scheduler.scheduleWithFixedDelay(new Thread(this), 0L, 5L, TimeUnit.SECONDS);
}
}

void onStop(@Observes ShutdownEvent ev) {
log.info("Shutdown");
scheduler.shutdown();
context.close();
}

@Override @ActivateRequestContext
public void run() {
JMSContext ctx = null;
try {
ctx = connectionFactory2.createContext(Session.AUTO_ACKNOWLEDGE);
JMSConsumer consumer = ctx.createConsumer(ctx.createQueue("geneQueue"));
while (true) {
geneService.processUpdate(consumer.receiveBody(GeneDTO.class));
}
} catch (Exception e) {
if(ctx != null) ctx.close();
log.info("Thread process failed: Error: " + e);
throw new RuntimeException(e);
}
}

public void send(GeneDTO gene) {
producer.send(context.createQueue("geneQueue"), context.createObjectMessage(gene));
}

}
14 changes: 14 additions & 0 deletions src/main/java/org/alliancegenome/curation_api/main/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.alliancegenome.curation_api.main;

import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;

@QuarkusMain
public class Main {

public static void main(String[] args) {
System.out.println("Running main method of quarkus");
Quarkus.run(args);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ public CrossReference processUpdate(CrossReferenceDTO crossReferenceDTO) {
crossReference.setPageAreas(crossReferenceDTO.getPages());
create(crossReference);
} else {
if(crossReference.getCurie().equals(crossReferenceDTO.getId())) {
crossReference.setPageAreas(crossReferenceDTO.getPages());
update(crossReference);
}
crossReference.setPageAreas(crossReferenceDTO.getPages());
update(crossReference);
}

return crossReference;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package org.alliancegenome.curation_api.services;

import lombok.extern.jbosslog.JBossLog;
import org.alliancegenome.curation_api.base.BaseService;
import org.alliancegenome.curation_api.base.SearchResults;
import org.alliancegenome.curation_api.dao.CrossReferenceDAO;
import org.alliancegenome.curation_api.dao.GeneDAO;
import org.alliancegenome.curation_api.model.entities.CrossReference;
import org.alliancegenome.curation_api.model.entities.Gene;
import org.alliancegenome.curation_api.model.entities.Synonym;
import org.alliancegenome.curation_api.model.ingest.json.dto.CrossReferenceDTO;
import org.alliancegenome.curation_api.model.ingest.json.dto.GeneDTO;
import org.alliancegenome.curation_api.model.input.Pagination;
import org.apache.commons.collections4.CollectionUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import javax.enterprise.context.RequestScoped;
import javax.enterprise.context.*;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.*;
import javax.transaction.Transactional;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.alliancegenome.curation_api.base.*;
import org.alliancegenome.curation_api.dao.*;
import org.alliancegenome.curation_api.model.entities.*;
import org.alliancegenome.curation_api.model.ingest.json.dto.*;
import org.alliancegenome.curation_api.model.input.Pagination;
import org.apache.commons.collections4.CollectionUtils;

import io.quarkus.runtime.*;
import lombok.extern.jbosslog.JBossLog;

@JBossLog
@RequestScoped
Expand All @@ -36,7 +34,6 @@ public class GeneService extends BaseService<Gene, GeneDAO> {
@Inject
SynonymService synonymService;


@Override
@PostConstruct
protected void init() {
Expand All @@ -59,15 +56,16 @@ public Gene getByIdOrCurie(String id) {

@Transactional
public void processUpdate(GeneDTO gene) {

//log.info("processUpdate Gene: ");

Gene g = geneDAO.find(gene.getBasicGeneticEntity().getPrimaryId());
boolean newGene = false;

if (g == null) {
g = new Gene();
g.setCurie(gene.getBasicGeneticEntity().getPrimaryId());
handleNewSynonyms(gene, g);
newGene = true;
handleNewSynonyms(gene, g);
} else {
handleUpdateSynonyms(gene, g);
}
Expand All @@ -80,6 +78,7 @@ public void processUpdate(GeneDTO gene) {
g.setTaxon(gene.getBasicGeneticEntity().getTaxonId());
g.setType(gene.getSoTermId());


List<CrossReferenceDTO> incomingCrossReferences = gene.getBasicGeneticEntity().getCrossReferences();
List<CrossReference> persitentCrossReferences = new ArrayList<>();
for (CrossReferenceDTO crossReferenceDTO : incomingCrossReferences) {
Expand All @@ -88,12 +87,8 @@ public void processUpdate(GeneDTO gene) {
}

g.setCrossReferences(persitentCrossReferences);
g.setSecondaryIdentifiers(gene.getBasicGeneticEntity().getSecondaryIds());

if(CollectionUtils.isNotEmpty(g.getSecondaryIdentifiers())) {
Set<String> secondaryIds = new LinkedHashSet<>(g.getSecondaryIdentifiers());
secondaryIds.addAll(gene.getBasicGeneticEntity().getSecondaryIds());
g.setSecondaryIdentifiers(new ArrayList<>(secondaryIds));
}
if (newGene) {
create(g);
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ quarkus.hibernate-orm.database.generation=update
quarkus.hibernate-search-orm.elasticsearch.version=7
quarkus.hibernate-search-orm.elasticsearch.hosts=localhost:9200

#quarkus.artemis.url=tcp://localhost:61616
#quarkus.artemis.username=quarkus
#quarkus.artemis.password=quarkus
quarkus.artemis.url=tcp://localhost:61616
quarkus.artemis.username=quarkus
quarkus.artemis.password=quarkus

quarkus.swagger-ui.always-include=true
quarkus.swagger-ui.doc-expansion=none
Expand Down

0 comments on commit 150a03e

Please sign in to comment.