Skip to content

Commit

Permalink
Merge pull request #5062 from inception-project/bugfix/5061-Multiple-…
Browse files Browse the repository at this point in the history
…synchronous-recommenders-only-the-last-one-wins

#5061 - Multiple synchronous recommenders only the last one wins
  • Loading branch information
reckart authored Sep 26, 2024
2 parents 60febcb + cffc4cb commit dd3581b
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private void predictSingle(String aCoveredText, int aBegin, int aEnd, CAS aCas,
}

var predictedType = getPredictedType(aCas);
// Feature scoreFeature = getScoreFeature(aCas);
var scoreFeature = getScoreFeature(aCas);
var predictedFeature = getPredictedFeature(aCas);
var isPredictionFeature = getIsPredictionFeature(aCas);

Expand All @@ -167,6 +167,7 @@ private void predictSingle(String aCoveredText, int aBegin, int aEnd, CAS aCas,
var annotation = aCas.createAnnotation(predictedType, aBegin, aEnd);
annotation.setStringValue(predictedFeature, prediction.getIdentifier());
annotation.setBooleanValue(isPredictionFeature, true);
annotation.setDoubleValue(scoreFeature, prediction.getScore());
aCas.addFsToIndexes(annotation);
suggestionsCreated++;
if (suggestionsCreated >= recommender.getMaxRecommendations()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public void setUp() throws Exception
buildAnnotation(cas, NamedEntity.class).on("Barack Obama").buildAllAndAddToIndexes();
SegmentationUtils.splitSentences(cas);
SegmentationUtils.tokenize(cas);
RecommenderTestHelper.addPredictionFeatures(cas, NamedEntity.class, "value");
RecommenderTestHelper.addPredictionFeatures(cas, NamedEntity.class,
NamedEntity._FeatName_value);
RecommenderTestHelper.addPredictionFeatures(cas, NamedEntity.class,
NamedEntity._FeatName_identifier);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingInt;
import static java.util.Comparator.comparingDouble;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.collections4.IteratorUtils.unmodifiableIterator;
Expand Down Expand Up @@ -174,10 +174,10 @@ private Map<LabelMapKey, Map<Long, T>> suggestionsByLabel(Preferences aPreferenc

public List<T> bestSuggestions(Preferences aPreferences)
{
Map<LabelMapKey, Map<Long, T>> labelMap = suggestionsByLabel(aPreferences);
var labelMap = suggestionsByLabel(aPreferences);

// Determine the maximum score per Label
Map<LabelMapKey, Double> maxScorePerLabel = new HashMap<>();
var maxScorePerLabel = new HashMap<LabelMapKey, Double>();
for (var label : labelMap.keySet()) {
double maxScore = labelMap.get(label).values().stream()
.mapToDouble(AnnotationSuggestion::getScore).max().orElse(0.0d);
Expand All @@ -195,13 +195,14 @@ public List<T> bestSuggestions(Preferences aPreferences)

// Create VID using the recommendation with the lowest recommendationId
List<T> canonicalSuggestions = new ArrayList<>();
for (LabelMapKey label : sortedAndFiltered) {
// Pick out the recommendations with the lowest recommendationId as canonical for
// generating the VID
for (var label : sortedAndFiltered) {
// Pick out the recommendations with the highest score and then lowest recommendationId
// as canonical for generating the VID
T ao = stream()
// check for label or feature for no-label annotations as key
.filter(p -> label.equalsAnnotationSuggestion(p))
.max(comparingInt(AnnotationSuggestion::getId)) //
.max(comparingDouble(AnnotationSuggestion::getScore)
.thenComparingInt(AnnotationSuggestion::getId)) //
.orElse(null);

if (ao != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,23 +800,23 @@ private void runSynchronousRecommenders(SourceDocument aDocument, String aDataOw
return;
}

var anySyncRan = false;
var syncRecommenders = new ArrayList<Recommender>();
for (var recommender : recommenders) {
var factory = getRecommenderFactory(recommender);
if (factory.map($ -> $.isSynchronous(recommender)).orElse(false)) {
schedulingService.executeSync(PredictionTask.builder() //
.withSessionOwner(sessionOwner) //
.withTrigger(aTrigger) //
.withCurrentDocument(aDocument) //
.withDataOwner(aDataOwner) //
.withRecommender(recommender) //
.build());

anySyncRan = true;
syncRecommenders.add(recommender);
}
}

if (anySyncRan) {
if (!syncRecommenders.isEmpty()) {
schedulingService.executeSync(PredictionTask.builder() //
.withSessionOwner(sessionOwner) //
.withTrigger(aTrigger) //
.withCurrentDocument(aDocument) //
.withDataOwner(aDataOwner) //
.withRecommender(syncRecommenders.toArray(Recommender[]::new)) //
.build());

var switched = forceSwitchPredictions(sessionOwner.getUsername(),
aDocument.getProject());
if (switched) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static de.tudarmstadt.ukp.inception.recommendation.api.recommender.TrainingCapability.TRAINING_NOT_SUPPORTED;
import static de.tudarmstadt.ukp.inception.rendering.model.Range.rangeCoveringDocument;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -93,7 +94,7 @@ public class PredictionTask
private final int predictionEnd;
private final String dataOwner;
private final boolean isolated;
private final Recommender recommender;
private final List<Recommender> recommenders;
private final boolean synchronousRecommenders;
private final boolean asynchronousRecommenders;

Expand All @@ -108,7 +109,7 @@ public PredictionTask(Builder<? extends Builder<?>> aBuilder)
predictionBegin = aBuilder.predictionBegin;
predictionEnd = aBuilder.predictionEnd;
isolated = aBuilder.isolated;
recommender = aBuilder.recommender;
recommenders = aBuilder.recommenders;
synchronousRecommenders = aBuilder.synchronousRecommenders;
asynchronousRecommenders = aBuilder.asynchronousRecommenders;
}
Expand Down Expand Up @@ -227,7 +228,7 @@ private Predictions generatePredictionsOnAllDocuments(List<SourceDocument> aDocu

monitor.setProgressWithMessage(progress, maxProgress,
LogMessage.info(this, "%s", document.getName()));
applyAllRecommendersToDocument(activePredictions, incomingPredictions,
applyActiveRecommendersToDocument(activePredictions, incomingPredictions,
casHolder.cas, document, -1, -1);
progress++;
}
Expand Down Expand Up @@ -285,19 +286,21 @@ private Predictions generatePredictionsOnSingleDocument(SourceDocument aCurrentD
try (var casHolder = new PredictionCasHolder()) {
var predictionCas = casHolder.cas;

if (recommender != null) {
if (isolated) {
var originalCas = new LazyCas(aCurrentDocument);
try {
applySingleRecomenderToDocument(originalCas, recommender,
predecessorPredictions, incomingPredictions, predictionCas,
aCurrentDocument, predictionBegin, predictionEnd);
}
catch (IOException e) {
logUnableToReadAnnotations(incomingPredictions, aCurrentDocument, e);
for (var recommender : recommenders) {
try {
applySingleRecomenderToDocument(originalCas, recommender,
predecessorPredictions, incomingPredictions, predictionCas,
aCurrentDocument, predictionBegin, predictionEnd);
}
catch (IOException e) {
logUnableToReadAnnotations(incomingPredictions, aCurrentDocument, e);
}
}
}
else {
applyAllRecommendersToDocument(predecessorPredictions, incomingPredictions,
applyActiveRecommendersToDocument(predecessorPredictions, incomingPredictions,
predictionCas, aCurrentDocument, predictionBegin, predictionEnd);
}
}
Expand Down Expand Up @@ -337,7 +340,7 @@ private Predictions getPredecessorPredictions(User sessionOwner, Project project
* @param aPredictionEnd
* end of the prediction range (negative to predict until the end of the document)
*/
private void applyAllRecommendersToDocument(Predictions aActivePredictions,
private void applyActiveRecommendersToDocument(Predictions aActivePredictions,
Predictions aPredictions, CAS aPredictionCas, SourceDocument aDocument,
int aPredictionBegin, int aPredictionEnd)
{
Expand Down Expand Up @@ -411,75 +414,75 @@ private void applySingleRecomenderToDocument(LazyCas aOriginalCas, Recommender a
return;
}

// We lazily load the CAS only at this point because that allows us to skip
// loading the CAS entirely if there is no enabled layer or recommender.
// If the CAS cannot be loaded, then we skip to the next document.
var originalCas = aOriginalCas.get();

try {
if (recommender != null && !recommender.equals(aRecommender)) {
logSkippingNotRequestedRecommender(aPredictions, aRecommender);
if (!recommenders.isEmpty() && !recommenders.contains(aRecommender)) {
logSkippingNotRequestedRecommender(aPredictions, aRecommender);

if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, originalCas, aRecommender,
activePredictions, aDocument);
}

return;
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, aRecommender, activePredictions,
aDocument);
}

var engine = factory.build(aRecommender);
return;
}

var isSynchronous = factory.isSynchronous(aRecommender);
if (isSynchronous && !synchronousRecommenders) {
logSkippingSynchronous(aPredictions, aRecommender);
var engine = factory.build(aRecommender);

// If possible, we inherit recommendations from a previous run while
// the recommender is still busy
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, originalCas, aRecommender,
activePredictions, aDocument);
}
var isSynchronous = factory.isSynchronous(aRecommender);
if (isSynchronous && !synchronousRecommenders) {
logSkippingSynchronous(aPredictions, aRecommender);

return;
// If possible, we inherit recommendations from a previous run while the recommender is
// still busy
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, aRecommender, activePredictions,
aDocument);
}

if (!isSynchronous && !asynchronousRecommenders) {
logSkippingAsynchronous(aPredictions, aRecommender);
return;
}

// If possible, we inherit recommendations from a previous run while
// the recommender is still busy
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, originalCas, aRecommender,
activePredictions, aDocument);
}
if (!isSynchronous && !asynchronousRecommenders) {
logSkippingAsynchronous(aPredictions, aRecommender);

return;
// If possible, we inherit recommendations from a previous run while the recommender is
// still busy
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, aRecommender, activePredictions,
aDocument);
}

if (!engine.isReadyForPrediction(context.get())) {
logRecommenderContextNoReady(aPredictions, aDocument, aRecommender);
return;
}

// If possible, we inherit recommendations from a previous run while
// the recommender is still busy
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, originalCas, aRecommender,
activePredictions, aDocument);
}
if (!engine.isReadyForPrediction(context.get())) {
logRecommenderContextNoReady(aPredictions, aDocument, aRecommender);

return;
// If possible, we inherit recommendations from a previous run while the recommender is
// still busy
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, aRecommender, activePredictions,
aDocument);
}

// If the recommender is not trainable and not sensitive to annotations,
// we can actually re-use the predictions.
if (TRAINING_NOT_SUPPORTED == engine.getTrainingCapability()
&& PREDICTION_USES_TEXT_ONLY == engine.getPredictionCapability()
&& activePredictions != null
&& activePredictions.hasRunPredictionOnDocument(aDocument)) {
inheritSuggestionsAtRecommenderLevel(aPredictions, originalCas,
engine.getRecommender(), activePredictions, aDocument);
return;
}
return;
}

// If the recommender is not trainable and not sensitive to annotations, we can actually
// re-use the predictions.
if (TRAINING_NOT_SUPPORTED == engine.getTrainingCapability()
&& PREDICTION_USES_TEXT_ONLY == engine.getPredictionCapability()
&& activePredictions != null
&& activePredictions.hasRunPredictionOnDocument(aDocument)) {
inheritSuggestionsAtRecommenderLevel(aPredictions, engine.getRecommender(),
activePredictions, aDocument);
return;
}

try {
// We lazily load the CAS only at this point because that allows us to skip loading the
// CAS entirely if there is no enabled layer or recommender. If the CAS cannot be
// loaded, then we skip to the next document.
var originalCas = aOriginalCas.get();

var ctx = new PredictionContext(context.get());
cloneAndMonkeyPatchCAS(getProject(), originalCas, predictionCas);
Expand All @@ -489,8 +492,8 @@ private void applySingleRecomenderToDocument(LazyCas aOriginalCas, Recommender a
predictionCas, predictionRange);
ctx.getMessages().forEach(aPredictions::log);
}
// Catching Throwable is intentional here as we want to continue the
// execution even if a particular recommender fails.
// Catching Throwable is intentional here as we want to continue the execution even if a
// particular recommender fails.
catch (Throwable e) {
logErrorExecutingRecommender(aPredictions, aDocument, aRecommender, e);

Expand All @@ -500,12 +503,11 @@ private void applySingleRecomenderToDocument(LazyCas aOriginalCas, Recommender a
aRecommender.getName(), e.getMessage())) //
.build());

// If there was a previous successful run of the recommender, inherit
// its suggestions to avoid that all the suggestions of the recommender
// simply disappear.
// If there was a previous successful run of the recommender, inherit its suggestions to
// avoid that all the suggestions of the recommender simply disappear.
if (activePredictions != null) {
inheritSuggestionsAtRecommenderLevel(aPredictions, originalCas, aRecommender,
activePredictions, aDocument);
inheritSuggestionsAtRecommenderLevel(aPredictions, aRecommender, activePredictions,
aDocument);
}

return;
Expand Down Expand Up @@ -570,7 +572,7 @@ private void invokeRecommender(Predictions aIncomingPredictions, PredictionConte
* Extracts existing predictions from the last prediction run so we do not have to recalculate
* them. This is useful when the engine is not trainable.
*/
private void inheritSuggestionsAtRecommenderLevel(Predictions aPredictions, CAS aOriginalCas,
private void inheritSuggestionsAtRecommenderLevel(Predictions aPredictions,
Recommender aRecommender, Predictions activePredictions, SourceDocument document)
{
var suggestions = activePredictions.getPredictionsByRecommenderAndDocument(aRecommender,
Expand Down Expand Up @@ -1012,7 +1014,7 @@ public static Builder<Builder<?>> builder()
public static class Builder<T extends Builder<?>>
extends RecommendationTask_ImplBase.Builder<T>
{
private Recommender recommender;
private final List<Recommender> recommenders = new ArrayList<>();
private SourceDocument currentDocument;
private String dataOwner;
private int predictionBegin = -1;
Expand All @@ -1025,13 +1027,15 @@ public static class Builder<T extends Builder<?>>
* Generate predictions only for the specified recommender. If this is not set, then
* predictions will be run for all active recommenders.
*
* @param aRecommender
* the one recommender to run.
* @param aRecommenders
* the recommenders to run.
*/
@SuppressWarnings("unchecked")
public T withRecommender(Recommender aRecommender)
public T withRecommender(Recommender... aRecommenders)
{
recommender = aRecommender;
if (aRecommenders != null) {
recommenders.addAll(asList(aRecommenders));
}
return (T) this;
}

Expand Down

0 comments on commit dd3581b

Please sign in to comment.