From e3a2c29a8939940d1d9424de4b4de9fef7efeff9 Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Tue, 1 Oct 2024 14:27:40 +0200 Subject: [PATCH] feat: add support for Hazelcast ReplicatedMap --- .../config/NeonBeeConfigConverter.java | 6 + src/main/java/io/neonbee/NeonBee.java | 12 +- .../io/neonbee/cache/CachingDataVerticle.java | 4 +- .../java/io/neonbee/config/NeonBeeConfig.java | 38 ++- .../endpoint/odatav4/ODataV4Endpoint.java | 52 ++-- .../io/neonbee/entity/EntityModelManager.java | 7 +- .../internal/SharedDataAccessorFactory.java | 58 ++++ .../neonbee/internal/WriteSafeRegistry.java | 14 +- .../cluster/entity/ClusterEntityRegistry.java | 22 +- .../hazelcast/ReplicatedAsyncMap.java | 174 +++++++++++ .../ReplicatedClusterEntityRegistry.java | 25 ++ .../hazelcast/ReplicatedDataAccessor.java | 64 ++++ .../ReplicatedWriteSafeRegistry.java | 23 ++ .../SharedDataAccessorFactoryTest.java | 33 ++ .../internal/deploy/DeploymentTest.java | 1 + .../hazelcast/ReplicatedAsyncMapTest.java | 291 ++++++++++++++++++ .../hazelcast/ReplicatedDataAccessorTest.java | 110 +++++++ 17 files changed, 901 insertions(+), 33 deletions(-) create mode 100644 src/main/java/io/neonbee/internal/SharedDataAccessorFactory.java create mode 100644 src/main/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMap.java create mode 100644 src/main/java/io/neonbee/internal/hazelcast/ReplicatedClusterEntityRegistry.java create mode 100644 src/main/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessor.java create mode 100644 src/main/java/io/neonbee/internal/hazelcast/ReplicatedWriteSafeRegistry.java create mode 100644 src/test/java/io/neonbee/internal/SharedDataAccessorFactoryTest.java create mode 100644 src/test/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMapTest.java create mode 100644 src/test/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessorTest.java diff --git a/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java b/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java index 3e0de603..9cd61e53 100644 --- a/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java +++ b/src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java @@ -102,6 +102,11 @@ static void fromJson(Iterable> json, NeonBee obj.setTrackingDataHandlingStrategy((String) member.getValue()); } break; + case "useReplicatedMaps": + if (member.getValue() instanceof Boolean) { + obj.setUseReplicatedMaps((Boolean) member.getValue()); + } + break; case "verticleDeploymentTimeout": if (member.getValue() instanceof Number) { obj.setVerticleDeploymentTimeout(((Number) member.getValue()).intValue()); @@ -155,6 +160,7 @@ static void toJson(NeonBeeConfig obj, java.util.Map json) { if (obj.getTrackingDataHandlingStrategy() != null) { json.put("trackingDataHandlingStrategy", obj.getTrackingDataHandlingStrategy()); } + json.put("useReplicatedMaps", obj.isUseReplicatedMaps()); if (obj.getVerticleDeploymentTimeout() != null) { json.put("verticleDeploymentTimeout", obj.getVerticleDeploymentTimeout()); } diff --git a/src/main/java/io/neonbee/NeonBee.java b/src/main/java/io/neonbee/NeonBee.java index 506ab0b2..436d8b12 100644 --- a/src/main/java/io/neonbee/NeonBee.java +++ b/src/main/java/io/neonbee/NeonBee.java @@ -63,6 +63,7 @@ import io.neonbee.internal.Registry; import io.neonbee.internal.ReplyInboundInterceptor; import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.SharedDataAccessorFactory; import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.buffer.ImmutableBuffer; import io.neonbee.internal.cluster.ClusterHelper; @@ -75,6 +76,7 @@ import io.neonbee.internal.codec.ImmutableJsonObjectMessageCodec; import io.neonbee.internal.deploy.Deployable; import io.neonbee.internal.deploy.Deployables; +import io.neonbee.internal.hazelcast.ReplicatedClusterEntityRegistry; import io.neonbee.internal.helper.ConfigHelper; import io.neonbee.internal.helper.FileSystemHelper; import io.neonbee.internal.job.RedeployEntitiesJob; @@ -449,7 +451,8 @@ private Future registerHooks() { */ @VisibleForTesting Future initializeSharedMaps() { - SharedDataAccessor sharedData = new SharedDataAccessor(vertx, NeonBee.class); + SharedDataAccessor sharedData = new SharedDataAccessorFactory(this) + .getSharedDataAccessor(NeonBee.class); sharedLocalMap = sharedData.getLocalMap(SHARED_MAP_NAME); return sharedData.getAsyncMap(SHARED_MAP_NAME).onSuccess(asyncMap -> sharedAsyncMap = asyncMap) .mapEmpty(); @@ -677,7 +680,12 @@ private Future deployModules() { this.healthRegistry = new HealthCheckRegistry(vertx); this.modelManager = new EntityModelManager(this); if (vertx.isClustered()) { - this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME); + if (config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(vertx).isPresent()) { + this.entityRegistry = + new ReplicatedClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME); + } else { + this.entityRegistry = new ClusterEntityRegistry(vertx, EntityVerticle.REGISTRY_NAME); + } } else { this.entityRegistry = new WriteSafeRegistry<>(vertx, EntityVerticle.REGISTRY_NAME); } diff --git a/src/main/java/io/neonbee/cache/CachingDataVerticle.java b/src/main/java/io/neonbee/cache/CachingDataVerticle.java index 111c263e..a0ae7970 100644 --- a/src/main/java/io/neonbee/cache/CachingDataVerticle.java +++ b/src/main/java/io/neonbee/cache/CachingDataVerticle.java @@ -28,6 +28,7 @@ import io.neonbee.data.DataRequest; import io.neonbee.data.DataVerticle; import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.SharedDataAccessorFactory; import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Vertx; @@ -147,7 +148,8 @@ public void init(Vertx vertx, Context context) { // we will only need to retrieve locks if we coalesce requests if (coalescingTimeout > 0) { - sharedDataAccessor = new SharedDataAccessor(vertx, getClass()); + sharedDataAccessor = new SharedDataAccessorFactory(vertx) + .getSharedDataAccessor(getClass()); } } diff --git a/src/main/java/io/neonbee/config/NeonBeeConfig.java b/src/main/java/io/neonbee/config/NeonBeeConfig.java index 19012244..22de5209 100644 --- a/src/main/java/io/neonbee/config/NeonBeeConfig.java +++ b/src/main/java/io/neonbee/config/NeonBeeConfig.java @@ -35,7 +35,7 @@ /** * In contrast to the {@link NeonBeeOptions} the {@link NeonBeeConfig} is persistent configuration in a file. - * + *

* Whilst the {@link NeonBeeOptions} contain information which is to specify when NeonBee starts, such as the port of * the server to start on and the cluster to connect to, which potentially could be different across cluster nodes, the * {@link NeonBeeConfig} contains information which is mostly shared across different cluster nodes or you would like to @@ -100,6 +100,8 @@ public class NeonBeeConfig { private int jsonMaxStringSize; + private boolean useReplicatedMaps; + /** * Are the metrics enabled? * @@ -535,4 +537,38 @@ public NeonBeeConfig setJsonMaxStringSize(int jsonMaxStringSize) { public int getJsonMaxStringSize() { return jsonMaxStringSize; } + + /** + * Set the value to enable, disable replicated maps. + *

+ * Currently this feature is only supported for a Hazelcast-Cluster + *

+ * Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a + * cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you + * don't need to partition the data. + *

+ * + * @return true if the replicated maps should be used, false otherwise + */ + public boolean isUseReplicatedMaps() { + return useReplicatedMaps; + } + + /** + * Set the value to enable, disable replicated maps. + *

+ * Currently this feature is only supported for a Hazelcast-Cluster + *

+ * Replicated maps are a distributed data structure that provides a way to replicate data across multiple nodes in a + * cluster. Replicated maps are useful when you need to replicate data across multiple nodes in a cluster, and you + * don't need to partition the data. + *

+ * + * @param useReplicatedMaps true if the replicated maps should be enabled, false otherwise + * @return the {@linkplain NeonBeeConfig} for fluent use + */ + public NeonBeeConfig setUseReplicatedMaps(boolean useReplicatedMaps) { + this.useReplicatedMaps = useReplicatedMaps; + return this; + } } diff --git a/src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java b/src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java index 7a4db353..9178b57a 100644 --- a/src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java +++ b/src/main/java/io/neonbee/endpoint/odatav4/ODataV4Endpoint.java @@ -29,7 +29,7 @@ import io.neonbee.endpoint.odatav4.internal.olingo.OlingoEndpointHandler; import io.neonbee.entity.EntityModel; import io.neonbee.internal.RegexBlockList; -import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.SharedDataAccessorFactory; import io.neonbee.logging.LoggingFacade; import io.vertx.core.Future; import io.vertx.core.Vertx; @@ -201,27 +201,35 @@ public Future createEndpointRouter(Vertx vertx, String basePath, JsonObj // when NeonBee is started and / or in case the endpoint is not used. Route initialRoute = router.route(); initialRoute.handler( - routingContext -> new SharedDataAccessor(vertx, ODataV4Endpoint.class).getLocalLock(asyncLock -> - // immediately initialize the router, this will also "arm" the event bus listener - (!initialized.getAndSet(true) - ? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models) - : succeededFuture()).onComplete(handler -> { - // wait for the refresh to finish (the result doesn't matter), remove the initial route, as - // this will redirect all requests to the registered service endpoint handlers (if non have - // been registered, e.g. due to a failure in model loading, it'll result in an 404). Could - // have been removed already by refreshRouter, we don't care! - initialRoute.remove(); - if (asyncLock.succeeded()) { - // releasing the lock will cause other requests unblock and not call the initial route - asyncLock.result().release(); - } - - // let the router again handle the context again, now with either all service endpoints - // registered, or none in case there have been a failure while loading the models. - // NOTE: Re-route is the only elegant way I found to restart the current router to take - // the new routes! Might consider checking again with the Vert.x 4.0 release. - routingContext.reroute(routingContext.request().uri()); - }))); + routingContext -> new SharedDataAccessorFactory(vertx) + .getSharedDataAccessor(ODataV4Endpoint.class) + .getLocalLock(asyncLock -> + // immediately initialize the router, this will also "arm" the event bus listener + (!initialized.getAndSet(true) + ? refreshRouter(vertx, router, basePath, uriConversion, exposedEntities, models) + : succeededFuture()).onComplete(handler -> { + // wait for the refresh to finish (the result doesn't matter), remove the initial + // route, as + // this will redirect all requests to the registered service endpoint handlers (if + // non have + // been registered, e.g. due to a failure in model loading, it'll result in an 404). + // Could + // have been removed already by refreshRouter, we don't care! + initialRoute.remove(); + if (asyncLock.succeeded()) { + // releasing the lock will cause other requests unblock and not call the initial + // route + asyncLock.result().release(); + } + + // let the router again handle the context again, now with either all service + // endpoints + // registered, or none in case there have been a failure while loading the models. + // NOTE: Re-route is the only elegant way I found to restart the current router to + // take + // the new routes! Might consider checking again with the Vert.x 4.0 release. + routingContext.reroute(routingContext.request().uri()); + }))); return succeededFuture(router); } diff --git a/src/main/java/io/neonbee/entity/EntityModelManager.java b/src/main/java/io/neonbee/entity/EntityModelManager.java index 2093c913..7a1b0f02 100644 --- a/src/main/java/io/neonbee/entity/EntityModelManager.java +++ b/src/main/java/io/neonbee/entity/EntityModelManager.java @@ -16,7 +16,7 @@ import com.google.common.base.Functions; import io.neonbee.NeonBee; -import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.SharedDataAccessorFactory; import io.neonbee.logging.LoggingFacade; import io.vertx.core.Future; import io.vertx.core.eventbus.DeliveryOptions; @@ -147,7 +147,10 @@ public Future> getSharedModels() { } // if not try to reload the models and return the loaded data model - return new SharedDataAccessor(neonBee.getVertx(), EntityModelManager.class).getLocalLock() + + return new SharedDataAccessorFactory(neonBee) + .getSharedDataAccessor(EntityModelManager.class) + .getLocalLock() .transform(asyncLocalLock -> { Map retryModels = getBufferedModels(); if (retryModels != null) { diff --git a/src/main/java/io/neonbee/internal/SharedDataAccessorFactory.java b/src/main/java/io/neonbee/internal/SharedDataAccessorFactory.java new file mode 100644 index 00000000..ed872216 --- /dev/null +++ b/src/main/java/io/neonbee/internal/SharedDataAccessorFactory.java @@ -0,0 +1,58 @@ +package io.neonbee.internal; + +import io.neonbee.NeonBee; +import io.neonbee.config.NeonBeeConfig; +import io.neonbee.internal.cluster.ClusterHelper; +import io.neonbee.internal.hazelcast.ReplicatedDataAccessor; +import io.vertx.core.Vertx; + +/** + * Factory to create a {@link SharedDataAccessor} based on the configuration. + */ +public class SharedDataAccessorFactory { + private final NeonBee neonBee; + + /** + * Create a new instance of {@link SharedDataAccessorFactory}. + */ + public SharedDataAccessorFactory() { + this.neonBee = NeonBee.get(); + } + + /** + * Create a new instance of {@link SharedDataAccessorFactory}. + * + * @param vertx the Vert.x instance + */ + public SharedDataAccessorFactory(Vertx vertx) { + this.neonBee = NeonBee.get(vertx); + } + + /** + * Create a new instance of {@link SharedDataAccessorFactory}. + * + * @param neonBee the NeonBee instance + */ + public SharedDataAccessorFactory(NeonBee neonBee) { + this.neonBee = neonBee; + } + + private boolean useHazelcastReplicatedMaps() { + NeonBeeConfig config = neonBee.getConfig(); + return config.isUseReplicatedMaps() && ClusterHelper.getHazelcastClusterManager(neonBee.getVertx()).isPresent(); + } + + /** + * Get a {@link SharedDataAccessor} based on the configuration. + * + * @param accessClass the class to access the shared data + * @return the shared data accessor + */ + public SharedDataAccessor getSharedDataAccessor(Class accessClass) { + if (useHazelcastReplicatedMaps()) { + return new ReplicatedDataAccessor(neonBee.getVertx(), accessClass); + } else { + return new SharedDataAccessor(neonBee.getVertx(), accessClass); + } + } +} diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index dcd3a3fb..fdebfa6a 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -26,6 +26,17 @@ public class WriteSafeRegistry implements Registry { private final String registryName; + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedDataAccessor the shared data accessor + */ + protected WriteSafeRegistry(String registryName, SharedDataAccessor sharedDataAccessor) { + this.registryName = registryName; + this.sharedDataAccessor = sharedDataAccessor; + } + /** * Create a new {@link WriteSafeRegistry}. * @@ -33,8 +44,7 @@ public class WriteSafeRegistry implements Registry { * @param registryName the name of the map registry */ public WriteSafeRegistry(Vertx vertx, String registryName) { - this.registryName = registryName; - this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass()); + this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class)); } /** diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java index 5056a376..03afafac 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java @@ -48,6 +48,22 @@ public class ClusterEntityRegistry implements Registry { private final WriteSafeRegistry entityRegistry; + /** + * Create a new instance of {@link ClusterEntityRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param entityRegistry the entity registry + * @param clusteringInformation the clustering information registry + */ + protected ClusterEntityRegistry( + Vertx vertx, + WriteSafeRegistry entityRegistry, + WriteSafeRegistry clusteringInformation) { + this.entityRegistry = entityRegistry; + this.clusteringInformation = clusteringInformation; + this.vertx = vertx; + } + /** * Create a new instance of {@link ClusterEntityRegistry}. * @@ -55,9 +71,9 @@ public class ClusterEntityRegistry implements Registry { * @param registryName the name of the map registry */ public ClusterEntityRegistry(Vertx vertx, String registryName) { - this.entityRegistry = new WriteSafeRegistry<>(vertx, registryName); - this.clusteringInformation = new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation"); - this.vertx = vertx; + this(vertx, + new WriteSafeRegistry<>(vertx, registryName), + new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation")); } @VisibleForTesting diff --git a/src/main/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMap.java b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMap.java new file mode 100644 index 00000000..4bc40cea --- /dev/null +++ b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMap.java @@ -0,0 +1,174 @@ + +package io.neonbee.internal.hazelcast; + +import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.convertParam; +import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.convertReturn; +import static io.vertx.spi.cluster.hazelcast.impl.HazelcastServerID.convertServerID; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import com.hazelcast.replicatedmap.ReplicatedMap; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.shareddata.AsyncMap; + +public class ReplicatedAsyncMap implements AsyncMap { + + private final Vertx vertx; + + private final ReplicatedMap map; + + /** + * Constructs a new instance of {@link ReplicatedAsyncMap}. + * + * @param vertx the Vert.x instance + * @param map the Hazelcast ReplicatedMap + */ + public ReplicatedAsyncMap(Vertx vertx, ReplicatedMap map) { + this.vertx = vertx; + this.map = map; + } + + @Override + public Future get(K k) { + K kk = convertParam(k); + return vertx.executeBlocking(() -> { + V vv = map.get(kk); + return convertReturn(vv); + }, false); + } + + @Override + public Future put(K k, V v) { + K kk = convertParam(k); + V vv = convertParam(v); + return vertx.executeBlocking(() -> { + map.put(kk, convertServerID(vv)); + return null; + }, false); + } + + @Override + public Future put(K k, V v, long ttl) { + K kk = convertParam(k); + V vv = convertParam(v); + return vertx.executeBlocking(() -> { + V prev = map.put(kk, convertServerID(vv), ttl, MILLISECONDS); + return convertReturn(prev); + }, false); + } + + @Override + public Future putIfAbsent(K k, V v) { + K kk = convertParam(k); + V vv = convertParam(v); + return vertx.executeBlocking(() -> { + V prev = map.putIfAbsent(kk, convertServerID(vv)); + return convertReturn(prev); + }, false); + } + + @Override + public Future putIfAbsent(K k, V v, long ttl) { + return get(k) + .compose(vv -> { + if (vv == null) { + return put(k, v, ttl).map(none -> v); + } else { + return Future.succeededFuture(); + } + }); + } + + @Override + public Future remove(K k) { + K kk = convertParam(k); + return vertx.executeBlocking(() -> { + V prev = map.remove(kk); + return convertReturn(prev); + }, false); + } + + @Override + public Future removeIfPresent(K k, V v) { + K kk = convertParam(k); + V vv = convertParam(v); + return vertx.executeBlocking(() -> map.remove(kk, vv), false); + } + + @Override + public Future replace(K k, V v) { + K kk = convertParam(k); + V vv = convertParam(v); + return vertx.executeBlocking(() -> { + V prev = map.replace(kk, vv); + return convertReturn(prev); + }, false); + } + + @Override + public Future replaceIfPresent(K k, V oldValue, V newValue) { + K kk = convertParam(k); + V oldVv = convertParam(oldValue); + V newVv = convertParam(newValue); + return vertx.executeBlocking(() -> map.replace(kk, oldVv, newVv), false); + } + + @Override + public Future clear() { + return vertx.executeBlocking(() -> { + map.clear(); + return null; + }, false); + } + + @Override + public Future size() { + return vertx.executeBlocking(map::size, false); + } + + @Override + public Future> keys() { + return vertx.executeBlocking(() -> { + Set set = new HashSet<>(); + for (K kk : map.keySet()) { + K k = convertReturn(kk); + set.add(k); + } + return set; + }, false); + } + + @Override + public Future> values() { + return vertx.executeBlocking(() -> { + List list = new ArrayList<>(); + for (V vv : map.values()) { + V v = convertReturn(vv); + list.add(v); + } + return list; + }, false); + } + + @Override + public Future> entries() { + return vertx.executeBlocking(() -> { + Map result = new HashMap<>(); + for (Entry entry : map.entrySet()) { + K k = convertReturn(entry.getKey()); + V v = convertReturn(entry.getValue()); + result.put(k, v); + } + return result; + }, false); + } +} diff --git a/src/main/java/io/neonbee/internal/hazelcast/ReplicatedClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedClusterEntityRegistry.java new file mode 100644 index 00000000..ae6408a2 --- /dev/null +++ b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedClusterEntityRegistry.java @@ -0,0 +1,25 @@ +package io.neonbee.internal.hazelcast; + +import io.neonbee.internal.cluster.entity.ClusterEntityRegistry; +import io.vertx.core.Vertx; + +/** + * A special registry implementation that stores cluster information's. + *

+ * This implementation stores note specific entries for the registered {@link EntityVerticle}. The cluster information + * is stored in a {@link ReplicatedWriteSafeRegistry}. + */ +public class ReplicatedClusterEntityRegistry extends ClusterEntityRegistry { + + /** + * Create a new instance of {@link ReplicatedClusterEntityRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public ReplicatedClusterEntityRegistry(Vertx vertx, String registryName) { + super(vertx, + new ReplicatedWriteSafeRegistry<>(vertx, registryName), + new ReplicatedWriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation")); + } +} diff --git a/src/main/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessor.java b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessor.java new file mode 100644 index 00000000..89976dab --- /dev/null +++ b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessor.java @@ -0,0 +1,64 @@ +package io.neonbee.internal.hazelcast; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.replicatedmap.ReplicatedMap; + +import io.neonbee.internal.SharedDataAccessor; +import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; + +/** + * A SharedData implementation that uses Hazelcast ReplicatedMap for distributed maps. + */ +public class ReplicatedDataAccessor extends SharedDataAccessor { + + private final Vertx vertx; + + private final HazelcastInstance hazelcast; + + /** + * Constructs a new instance of {@link ReplicatedDataAccessor}. + * + * @param vertx the Vert.x instance + * @param accessClass the class of the shared data accessor + */ + public ReplicatedDataAccessor(Vertx vertx, Class accessClass) { + super(vertx, accessClass); + this.vertx = vertx; + this.hazelcast = ClusterHelper.getHazelcastClusterManager(vertx) + .map(HazelcastClusterManager::getHazelcastInstance) + .orElse(null); + } + + @Override + public void getClusterWideMap(String name, Handler>> asyncResultHandler) { + this.getClusterWideMap(name).onComplete(asyncResultHandler); + } + + @Override + public Future> getClusterWideMap(String name) { + return getAsyncMap(name); + } + + @Override + public void getAsyncMap(String name, Handler>> asyncResultHandler) { + this.getAsyncMap(name).onComplete(asyncResultHandler); + } + + @Override + public Future> getAsyncMap(String name) { + if (hazelcast == null) { + return super.getAsyncMap(name); + } else { + return vertx.executeBlocking(() -> { + ReplicatedMap map = hazelcast.getReplicatedMap(name); + return new ReplicatedAsyncMap<>(vertx, map); + }, false); + } + } +} diff --git a/src/main/java/io/neonbee/internal/hazelcast/ReplicatedWriteSafeRegistry.java b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedWriteSafeRegistry.java new file mode 100644 index 00000000..6e6192de --- /dev/null +++ b/src/main/java/io/neonbee/internal/hazelcast/ReplicatedWriteSafeRegistry.java @@ -0,0 +1,23 @@ +package io.neonbee.internal.hazelcast; + +import io.neonbee.internal.WriteSafeRegistry; +import io.vertx.core.Vertx; + +/** + * A special registry implementation that stores cluster information's. + *

+ * This implementation stores note specific entries for the registered {@link io.neonbee.entity.EntityVerticle}. The + * cluster information is stored using a {@link ReplicatedDataAccessor}. + */ +public class ReplicatedWriteSafeRegistry extends WriteSafeRegistry { + + /** + * Create a new instance of {@link ReplicatedWriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + ReplicatedWriteSafeRegistry(Vertx vertx, String registryName) { + super(registryName, new ReplicatedDataAccessor(vertx, WriteSafeRegistry.class)); + } +} diff --git a/src/test/java/io/neonbee/internal/SharedDataAccessorFactoryTest.java b/src/test/java/io/neonbee/internal/SharedDataAccessorFactoryTest.java new file mode 100644 index 00000000..fa32e580 --- /dev/null +++ b/src/test/java/io/neonbee/internal/SharedDataAccessorFactoryTest.java @@ -0,0 +1,33 @@ +package io.neonbee.internal; + +import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.HAZELCAST; +import static io.neonbee.test.base.NeonBeeTestBase.LONG_RUNNING_TEST; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; + +import io.neonbee.NeonBee; +import io.neonbee.NeonBeeExtension; +import io.neonbee.NeonBeeInstanceConfiguration; +import io.neonbee.internal.hazelcast.ReplicatedDataAccessor; + +@Tag(LONG_RUNNING_TEST) +@Execution(ExecutionMode.SAME_THREAD) +@ExtendWith(NeonBeeExtension.class) +@Isolated +class SharedDataAccessorFactoryTest { + + @Test + void testGetSharedDataAccessor( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee) { + SharedDataAccessor sharedDataAccessor = new SharedDataAccessorFactory(neonBee.getVertx()) + .getSharedDataAccessor(SharedDataAccessorFactoryTest.class); + assertInstanceOf(ReplicatedDataAccessor.class, sharedDataAccessor); + } +} diff --git a/src/test/java/io/neonbee/internal/deploy/DeploymentTest.java b/src/test/java/io/neonbee/internal/deploy/DeploymentTest.java index 4bd6667b..88b9b099 100644 --- a/src/test/java/io/neonbee/internal/deploy/DeploymentTest.java +++ b/src/test/java/io/neonbee/internal/deploy/DeploymentTest.java @@ -22,6 +22,7 @@ @ExtendWith(VertxExtension.class) class DeploymentTest { + @Test @DisplayName("should return the deployable") void getDeployableTest() { diff --git a/src/test/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMapTest.java b/src/test/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMapTest.java new file mode 100644 index 00000000..4d1a30c8 --- /dev/null +++ b/src/test/java/io/neonbee/internal/hazelcast/ReplicatedAsyncMapTest.java @@ -0,0 +1,291 @@ +package io.neonbee.internal.hazelcast; + +import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.HAZELCAST; +import static io.neonbee.test.base.NeonBeeTestBase.LONG_RUNNING_TEST; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; + +import com.hazelcast.replicatedmap.ReplicatedMap; + +import io.neonbee.NeonBee; +import io.neonbee.NeonBeeExtension; +import io.neonbee.NeonBeeInstanceConfiguration; +import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.core.Future; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxTestContext; +import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; + +@Tag(LONG_RUNNING_TEST) +@Execution(ExecutionMode.SAME_THREAD) +@ExtendWith(NeonBeeExtension.class) +@Isolated +class ReplicatedAsyncMapTest { + + private static ReplicatedAsyncMap getHazelcastReplicatedMap(NeonBee neonBee) { + ReplicatedMap replicatedMap = ClusterHelper.getHazelcastClusterManager(neonBee.getVertx()) + .map(HazelcastClusterManager::getHazelcastInstance) + .map(hcm -> hcm.getReplicatedMap("test-map")) + .orElseThrow(() -> new IllegalStateException("Failed to get Hazelcast Replicated Map instance")); + return new ReplicatedAsyncMap<>(neonBee.getVertx(), replicatedMap); + } + + @Test + void testPutAndGet( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + replicatedAsyncMap + .put("key1", "value1") + .compose(ar -> replicatedAsyncMap.get("key1")) + .onSuccess(value -> testContext.verify(() -> { + assertEquals("value1", value); + testContext.completeNow(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testPutIfAbsent( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + Checkpoint checkpoint = testContext.checkpoint(2); + replicatedAsyncMap + .putIfAbsent("key2", "value2") + .onSuccess(returnedValue -> testContext.verify(() -> { + assertNull(returnedValue); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.putIfAbsent("key2", "value3")) + .onSuccess(returnedValue -> testContext.verify(() -> { + assertEquals("value2", returnedValue); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testRemove( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + Checkpoint checkpoint = testContext.checkpoint(2); + replicatedAsyncMap.put("key3", "value3") + .compose(ar -> replicatedAsyncMap.remove("key3")) + .onSuccess(removedValue -> testContext.verify(() -> { + assertEquals("value3", removedValue); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.get("key3")) + .onSuccess(returnedValue -> testContext.verify(() -> { + assertNull(returnedValue); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testReplace( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + Checkpoint checkpoint = testContext.checkpoint(2); + replicatedAsyncMap.put("key4", "value4") + .compose(o -> replicatedAsyncMap.replace("key4", "newValue4")) + .onSuccess(returnedValue -> testContext.verify(() -> { + assertEquals("value4", returnedValue); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.get("key4")) + .onSuccess(returnedValue -> testContext.verify(() -> { + assertEquals("newValue4", returnedValue); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testReplaceIfPresent( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + var checkpoint = testContext.checkpoint(3); + replicatedAsyncMap.put("key5", "value5") + .compose(o -> replicatedAsyncMap.replaceIfPresent("key5", null, "value5")) + .onSuccess(replaced -> testContext.verify(() -> { + assertFalse(replaced); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.replaceIfPresent("key5", "value5", "newValue5")) + .onSuccess(replaced -> testContext.verify(() -> { + assertTrue(replaced); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.get("key5")) + .onSuccess(returnedValue -> testContext.verify(() -> { + assertEquals("newValue5", returnedValue); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testRemoveIfPresent( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + var checkpoint = testContext.checkpoint(3); + replicatedAsyncMap.put("key6", "value") + .compose(o -> replicatedAsyncMap.removeIfPresent("key6", "otherValue")) + .onSuccess(removed -> testContext.verify(() -> { + assertFalse(removed); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.removeIfPresent("key6", "value")) + .onSuccess(removed -> testContext.verify(() -> { + assertTrue(removed); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.get("key")) + .onSuccess(returnedValue -> testContext.verify(() -> { + assertNull(returnedValue); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testClear( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + var checkpoint = testContext.checkpoint(2); + Future.all( + replicatedAsyncMap.put("key7", "value7"), + replicatedAsyncMap.put("key8", "value8")) + .compose(o -> replicatedAsyncMap.size()) + .onSuccess(size -> testContext.verify(() -> { + assertEquals(2, size); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.clear()) + .compose(o -> replicatedAsyncMap.size()) + .onSuccess(size -> testContext.verify(() -> { + assertEquals(0, size); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testSize( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + var checkpoint = testContext.checkpoint(3); + replicatedAsyncMap.size() + .onSuccess(size -> testContext.verify(() -> { + assertEquals(0, size); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.put("key9", "value9")) + .compose(o -> replicatedAsyncMap.size()) + .onSuccess(size -> testContext.verify(() -> { + assertEquals(1, size); + checkpoint.flag(); + })) + .compose(o -> replicatedAsyncMap.remove("key9")) + .compose(o -> replicatedAsyncMap.size()) + .onSuccess(size -> testContext.verify(() -> { + assertEquals(0, size); + checkpoint.flag(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testKeys( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + Future.all( + replicatedAsyncMap.put("key10", "value10"), + replicatedAsyncMap.put("key11", "value11")) + .compose(o -> replicatedAsyncMap.keys()) + .onSuccess(keys -> testContext.verify(() -> { + assertEquals(2, keys.size()); + assertTrue(keys.contains("key10")); + assertTrue(keys.contains("key11")); + testContext.completeNow(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testValues( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + Future.all( + replicatedAsyncMap.put("key12", "value12"), + replicatedAsyncMap.put("key13", "value13")) + .compose(o -> replicatedAsyncMap.values()) + .onSuccess(values -> testContext.verify(() -> { + assertEquals(2, values.size()); + assertTrue(values.contains("value12")); + assertTrue(values.contains("value13")); + testContext.completeNow(); + })) + .onFailure(testContext::failNow); + } + + @Test + void testEntries( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedAsyncMap replicatedAsyncMap = getHazelcastReplicatedMap(neonBee); + + Future.all( + replicatedAsyncMap.put("key14", "value14"), + replicatedAsyncMap.put("key15", "value15")) + .compose(o -> replicatedAsyncMap.entries()) + .onSuccess(entries -> testContext.verify(() -> { + assertEquals(2, entries.size()); + assertEquals("value14", entries.get("key14")); + assertEquals("value15", entries.get("key15")); + testContext.completeNow(); + })) + .onFailure(testContext::failNow); + } +} diff --git a/src/test/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessorTest.java b/src/test/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessorTest.java new file mode 100644 index 00000000..b8ba8ba2 --- /dev/null +++ b/src/test/java/io/neonbee/internal/hazelcast/ReplicatedDataAccessorTest.java @@ -0,0 +1,110 @@ +package io.neonbee.internal.hazelcast; + +import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.HAZELCAST; +import static io.neonbee.test.base.NeonBeeTestBase.LONG_RUNNING_TEST; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; + +import io.neonbee.NeonBee; +import io.neonbee.NeonBeeExtension; +import io.neonbee.NeonBeeInstanceConfiguration; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxTestContext; + +@Tag(LONG_RUNNING_TEST) +@Execution(ExecutionMode.SAME_THREAD) +@ExtendWith(NeonBeeExtension.class) +@Isolated +class ReplicatedDataAccessorTest { + + @Test + void getAsyncMap( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + var dataAccessor = new ReplicatedDataAccessor(neonBee.getVertx(), ReplicatedDataAccessorTest.class); + + dataAccessor.getAsyncMap("testMap") + .onSuccess(asyncMap -> testContext.verify(() -> { + assertInstanceOf(ReplicatedAsyncMap.class, asyncMap); + testContext.completeNow(); + + })).onFailure(testContext::failNow); + } + + @Test + void getAsyncMapHandler( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + var dataAccessor = new ReplicatedDataAccessor(neonBee.getVertx(), ReplicatedDataAccessorTest.class); + + dataAccessor.getAsyncMap("testMap", asyncMap -> testContext.verify(() -> { + if (asyncMap.succeeded()) { + assertInstanceOf(ReplicatedAsyncMap.class, asyncMap.result()); + testContext.completeNow(); + } else { + testContext.failNow(asyncMap.cause()); + } + })); + } + + @Test + void getAsyncMapWithoutHazelcast( + VertxTestContext testContext) { + Vertx vertx = Vertx.vertx(); + ReplicatedDataAccessor dataAccessor = new ReplicatedDataAccessor(vertx, ReplicatedDataAccessorTest.class); + + dataAccessor.getAsyncMap("testMap") + .onSuccess(asyncMap -> testContext.verify(() -> { + assertNotNull(asyncMap); + // Since Hazelcast is null, it should delegate to super.getAsyncMap + assertFalse(asyncMap instanceof ReplicatedAsyncMap); + testContext.completeNow(); + })).onFailure(testContext::failNow) + .compose(event -> vertx.close()); + } + + @Test + void getClusterWideMap( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedDataAccessor dataAccessor = + new ReplicatedDataAccessor(neonBee.getVertx(), ReplicatedDataAccessorTest.class); + + dataAccessor.getClusterWideMap("clusterMap") + .onSuccess(asyncMap -> testContext.verify(() -> { + assertInstanceOf(ReplicatedAsyncMap.class, asyncMap); + testContext.completeNow(); + })) + .onFailure(testContext::failNow); + } + + @Test + void getClusterWideMapHandler( + @NeonBeeInstanceConfiguration(clustered = true, activeProfiles = {}, + clusterManager = HAZELCAST) NeonBee neonBee, + VertxTestContext testContext) { + ReplicatedDataAccessor dataAccessor = + new ReplicatedDataAccessor(neonBee.getVertx(), ReplicatedDataAccessorTest.class); + + dataAccessor.getClusterWideMap("clusterMap", asyncMap -> testContext.verify(() -> { + if (asyncMap.succeeded()) { + assertInstanceOf(ReplicatedAsyncMap.class, asyncMap.result()); + testContext.completeNow(); + } else { + testContext.failNow(asyncMap.cause()); + } + })); + } + +}