Skip to content

Commit

Permalink
Fixes #3169 : Track event listeners keyed by type to allow earlier ev…
Browse files Browse the repository at this point in the history
…ent firing veto
  • Loading branch information
chrisdennis committed Jun 6, 2023
1 parent 4c9986d commit 2420642
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ehcache.event.EventType;

import java.util.EnumSet;
import java.util.Set;

/**
* Internal wrapper for {@link CacheEventListener} and their configuration.
Expand Down Expand Up @@ -86,8 +87,8 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
return listener;
}

public boolean isForEventType(EventType type) {
return forEvents.contains(type);
public Set<EventType> getEventTypes() {
return forEvents;
}

public boolean isOrdered() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;
import static java.util.EnumSet.allOf;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Stream.concat;

/**
* Per-cache component that manages cache event listener registrations, and provides event delivery based on desired
Expand All @@ -55,10 +65,12 @@ public class CacheEventDispatcherImpl<K, V> implements CacheEventDispatcher<K, V
private static final Logger LOGGER = LoggerFactory.getLogger(CacheEventDispatcherImpl.class);
private final ExecutorService unOrderedExectuor;
private final ExecutorService orderedExecutor;
private int listenersCount = 0;
private int orderedListenerCount = 0;
private final List<EventListenerWrapper<K, V>> syncListenersList = new CopyOnWriteArrayList<>();
private final List<EventListenerWrapper<K, V>> aSyncListenersList = new CopyOnWriteArrayList<>();

private final Map<EventType, List<EventListenerWrapper<K, V>>> syncListenersList = unmodifiableMap(allOf(EventType.class).stream()
.collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class))));
private final Map<EventType, List<EventListenerWrapper<K, V>>> asyncListenersList = unmodifiableMap(allOf(EventType.class).stream()
.collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class))));

private final StoreEventListener<K, V> eventListener = new StoreListener();

private volatile Cache<K, V> listenerSource;
Expand Down Expand Up @@ -94,69 +106,76 @@ public void registerCacheEventListener(CacheEventListener<? super K, ? super V>
* @param wrapper the listener wrapper to register
*/
private synchronized void registerCacheEventListener(EventListenerWrapper<K, V> wrapper) {
if(aSyncListenersList.contains(wrapper) || syncListenersList.contains(wrapper)) {

if(allListeners().anyMatch(wrapper::equals)) {
throw new IllegalStateException("Cache Event Listener already registered: " + wrapper.getListener());
}

if (wrapper.isOrdered() && orderedListenerCount++ == 0) {
boolean firstListener = !allListeners().findAny().isPresent();

if (wrapper.isOrdered() && (firstListener || allListeners().noneMatch(EventListenerWrapper::isOrdered))) {
storeEventSource.setEventOrdering(true);
}

switch (wrapper.getFiringMode()) {
case ASYNCHRONOUS:
aSyncListenersList.add(wrapper);
wrapper.getEventTypes().forEach(type -> asyncListenersList.get(type).add(wrapper));
break;
case SYNCHRONOUS:
if (syncListenersList.isEmpty()) {
storeEventSource.setSynchronous(true);
}
syncListenersList.add(wrapper);
wrapper.getEventTypes().forEach(type -> syncListenersList.get(type).add(wrapper));
break;
default:
throw new AssertionError("Unhandled EventFiring value: " + wrapper.getFiringMode());
}

if (listenersCount++ == 0) {
if (firstListener) {
storeEventSource.addEventListener(eventListener);
}
}

private Stream<EventListenerWrapper<K, V>> allListeners() {
return concat(asyncListeners(), syncListeners());
}

private Stream<EventListenerWrapper<K, V>> syncListeners() {
return syncListenersList.values().stream().flatMap(Collection::stream);
}

private Stream<EventListenerWrapper<K, V>> asyncListeners() {
return asyncListenersList.values().stream().flatMap(Collection::stream);
}

/**
* {@inheritDoc}
*/
@Override
public void deregisterCacheEventListener(CacheEventListener<? super K, ? super V> listener) {
public synchronized void deregisterCacheEventListener(CacheEventListener<? super K, ? super V> listener) {
EventListenerWrapper<K, V> wrapper = new EventListenerWrapper<>(listener);

if (!removeWrapperFromList(wrapper, aSyncListenersList)) {
if (!removeWrapperFromList(wrapper, syncListenersList)) {
throw new IllegalStateException("Unknown cache event listener: " + listener);
}
boolean removed = Stream.of(asyncListenersList, syncListenersList)
.flatMap(list -> list.values().stream())
.map(list -> list.remove(wrapper))
.reduce((a, b) -> a || b).orElse(false);

if (!removed) {
throw new IllegalStateException("Unknown cache event listener: " + listener);
}
}

/**
* Synchronized to make sure listener removal is atomic
*
* @param wrapper the listener wrapper to unregister
* @param listenersList the listener list to remove from
*/
private synchronized boolean removeWrapperFromList(EventListenerWrapper<K, V> wrapper, List<EventListenerWrapper<K, V>> listenersList) {
int index = listenersList.indexOf(wrapper);
if (index != -1) {
EventListenerWrapper<K, V> containedWrapper = listenersList.remove(index);
if(containedWrapper.isOrdered() && --orderedListenerCount == 0) {
if (!allListeners().findAny().isPresent()) {
storeEventSource.setSynchronous(false);
storeEventSource.setEventOrdering(false);
storeEventSource.removeEventListener(eventListener);
} else {
if (allListeners().noneMatch(EventListenerWrapper::isOrdered)) {
storeEventSource.setEventOrdering(false);
}
if (--listenersCount == 0) {
storeEventSource.removeEventListener(eventListener);
}
if (syncListenersList.isEmpty()) {
if (!syncListeners().findAny().isPresent()) {
storeEventSource.setSynchronous(false);
}
return true;
}
return false;
}

/**
Expand All @@ -167,8 +186,8 @@ public synchronized void shutdown() {
storeEventSource.removeEventListener(eventListener);
storeEventSource.setEventOrdering(false);
storeEventSource.setSynchronous(false);
syncListenersList.clear();
aSyncListenersList.clear();
syncListenersList.values().forEach(Collection::clear);
asyncListenersList.values().forEach(Collection::clear);
unOrderedExectuor.shutdown();
orderedExecutor.shutdown();
}
Expand All @@ -188,11 +207,13 @@ void onEvent(CacheEvent<K, V> event) {
} else {
executor = unOrderedExectuor;
}
if (!aSyncListenersList.isEmpty()) {
executor.submit(new EventDispatchTask<>(event, aSyncListenersList));
List<EventListenerWrapper<K, V>> asyncTargets = asyncListenersList.get(event.getType());
if (!asyncTargets.isEmpty()) {
executor.submit(new EventDispatchTask<>(event, asyncTargets));
}
if (!syncListenersList.isEmpty()) {
Future<?> future = executor.submit(new EventDispatchTask<>(event, syncListenersList));
List<EventListenerWrapper<K, V>> syncTargets = syncListenersList.get(event.getType());
if (!syncTargets.isEmpty()) {
Future<?> future = executor.submit(new EventDispatchTask<>(event, syncTargets));
try {
future.get();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,10 @@ class EventDispatchTask<K, V> implements Runnable {
@Override
public void run() {
for(EventListenerWrapper<K, V> listenerWrapper : listenerWrappers) {
if (listenerWrapper.isForEventType(cacheEvent.getType())) {
try {
listenerWrapper.onEvent(cacheEvent);
} catch (Exception e) {
LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e);
}
try {
listenerWrapper.onEvent(cacheEvent);
} catch (Exception e) {
LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e);
}
}
}
Expand Down

0 comments on commit 2420642

Please sign in to comment.