From 4dd4e7979d96c16f6bf023a27ae6a5a1d2f21509 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 29 Jul 2023 03:53:26 +0700 Subject: [PATCH] Fix PersistenceQuery ReadJournalFor thread safety issue --- .../Akka.Persistence.Query/PersistenceQuery.cs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/core/Akka.Persistence.Query/PersistenceQuery.cs b/src/core/Akka.Persistence.Query/PersistenceQuery.cs index d8fe409a692..a4914001226 100644 --- a/src/core/Akka.Persistence.Query/PersistenceQuery.cs +++ b/src/core/Akka.Persistence.Query/PersistenceQuery.cs @@ -19,6 +19,7 @@ public sealed class PersistenceQuery : IExtension private readonly ExtendedActorSystem _system; private readonly ConcurrentDictionary _readJournalPluginExtensionIds = new(); private ILoggingAdapter _log; + private readonly object _lock = new (); public static PersistenceQuery Get(ActorSystem system) { @@ -34,8 +35,18 @@ public PersistenceQuery(ExtendedActorSystem system) public TJournal ReadJournalFor(string readJournalPluginId) where TJournal : IReadJournal { - var plugin = _readJournalPluginExtensionIds.GetOrAdd(readJournalPluginId, path => CreatePlugin(path, GetDefaultConfig()).GetReadJournal()); - return (TJournal)plugin; + if(_readJournalPluginExtensionIds.TryGetValue(readJournalPluginId, out var plugin)) + return (TJournal)plugin; + + lock (_lock) + { + if (_readJournalPluginExtensionIds.TryGetValue(readJournalPluginId, out plugin)) + return (TJournal)plugin; + + plugin = CreatePlugin(readJournalPluginId, GetDefaultConfig()).GetReadJournal(); + _readJournalPluginExtensionIds[readJournalPluginId] = plugin; + return (TJournal)plugin; + } } private IReadJournalProvider CreatePlugin(string configPath, Config config)