Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some events are skipped by Akka.Persistence.Query when under load. #313

Open
jaydeboer opened this issue Apr 19, 2023 · 24 comments
Open

Some events are skipped by Akka.Persistence.Query when under load. #313

jaydeboer opened this issue Apr 19, 2023 · 24 comments

Comments

@jaydeboer
Copy link

Version Information
Version of Akka.NET?

  <ItemGroup>
    <!--<PackageVersion Include="Akka" Version="1.5.2" />-->
    <PackageVersion Include="Akka.Cluster.Hosting" Version="1.5.2" />
    <PackageVersion Include="Akka.Discovery" Version="1.5.2" />
    <PackageVersion Include="Akka.Cluster" Version="1.5.2" />
    <PackageVersion Include="Akka.Hosting" Version="1.5.2" />
    <PackageVersion Include="Akka.HealthCheck.Hosting.Web" Version="1.5.0.1" />
    <PackageVersion Include="Akka.Management" Version="1.5.0" />
    <PackageVersion Include="Akka.Persistence.MongoDb" Version="1.5.1.1" />
    <PackageVersion Include="Mongo2Go" Version="3.1.3" />
    <PackageVersion Include="MongoDB.Driver" Version="2.19.1" />
    <PackageVersion Include="Petabridge.Cmd.Cluster.Sharding" Version="$(PbmVersion)" />
    <PackageVersion Include="Petabridge.Cmd.Cluster" Version="$(PbmVersion)" />
    <PackageVersion Include="Petabridge.Cmd.Remote" Version="$(PbmVersion)" />
  </ItemGroup>

Describe the bug
When there are a fair number of writes and reads happening at the same time, an event will be skipped. I have tested this against MongoDB running as a single node cluster running in Docker. I have reproduced the same behavior with EventsByTag and AllEvents queries. It may take many thousands of events for one to be skipped.

To Reproduce
I have a sink setup as follows:

    protected override void OnReplaySuccess()
    {
        var materializer = Context.Materializer();
        var self = Self;
        var startingOffset = _lastProcessedId;
        _query.EventsByTag($"PIPELINE:{_versionedPipelineEndpoint.PipelineName}", Offset.Sequence(startingOffset))
            .Where(e => e.Event is IDataReceivedEvent)
            .Select(e => new SendData(GetTransformedDataFromEvent(e.Event), GetOffsetFromSequence(e.Offset)))
            .RunWith(Sink.ActorRef<SendData>(self, UnexpectedEndOfStream.Instance, ex => { _log.Info(ex.Message); return ex; }), materializer);
        base.OnReplaySuccess();
    }

Expected behavior
Each of the events would be sent to the subscriber.

Actual behavior
Some go missing.

Environment
.NET 7
Windows 11 and MacOS both exhibit the same behavior.

@ptjhuang
Copy link
Contributor

ptjhuang commented May 13, 2023

We are also battling with this ever since going to production, and has become more than a nuisance now that the system is getting heavier use.

TLDR; the latest experiment we're testing is to use linearizable read concern when querying with EventsByTag or AllEvents. Would appreciate if @jaydeboer and others could test if this is a solution for them too.

My experiments so far seem to suggest this is down to how writes take place, and what data is returned by queries from MongoDB.

There is a danger zone around when documents have just been sent for writing. It seems that after the BsonTimestamp (Ordering field) is assigned to documents to be written, the order in which those documents become available for querying is not necessarily the same as the timestamp.

What that means is that when querying for events with Ordering >= 100, you may get event 100, 102, but not 101 which is in-flight. The code will of course next ask for Ordering >= 103, which means 101 has been skipped. But if you replay from 100 again some time later, you will now see your skipped event 101.

Here's how we came to this hypothesis:

  • Problem unreproducible when we deliberately look at Ordering < (now - 1 second) instead of < Int64.MaxValue
  • Problem unreproducible when using linearizable read concern, which waits for concurrent writes to finish before returning, see this discussion
  • Problem unreproducible when rehydrating actors - replay doesn't usually happens immediately after event writes
  • Problem unreproducible when concurrently persisting and querying with AllEvents from a single actor - writes are serialized as per actor characteristic. WriteMessagesAsync concurrently writes messages if coming from multiple actors.
  • Problem unreproducible when we have a slow subscriber querying with AllEvents - writes are well and truly settled into the database by the time the query gets to it

For completeness, performance characteristics from our testing with 4M messages (200 actors, 20k msgs each), on a single machine dev environment, MongoDb in docker single-node replica set:

  • Local read concern (currently the default) - causes skipping (~3500 msgs/sec written then read)
  • Snapshot read concern - causes skipping at 3.4 mins (~3500 msgs/sec written then read)
  • Majority read concern - causes skipping 19 mins although not frequently (~3000 msgs/sec written then read)
  • Linearizable read concern - OK (~25 mins, 2800 msgs/sec written then read)
  • Do not look at T-1s + local read concern - OK (~ 24 mins, 2800 msgs/sec written then read)

There's definitely a drop in performance, but I'd say it's better than skipping events.

You can change the read concern via the connection string.

Test harness - I've omitted the test actor and events - they're super simple, does nothing but calls Persist() to write the event.

public async Task All_events_should_project_correctly() {
    var actorCount = 200;
    var msgCount = 20_000;
            
    Log.Info($"TenantId {TestTenantId}");

    // Setup actors
    Func<int, WidgetId> getActorId = actorId => WidgetId.With(Guid.Parse($"00000000-0000-0000-0000-{actorId:000000000000}"));
    Func<WidgetId, string> toDisplay = id => $"widget_{id.GetGuid().ToString("n").TrimStart('0')}";

    var actors = Enumerable.Range(1, actorCount)
        .Select(i => Sys.ActorOf(Props.Create(() => new WidgetActor(getActorId(i), TestTenantId))))
        .ToArray();

    // Concurrently issue command to all actors to cause concurrent write
    _ = Source.UnfoldInfinite(0, msgId => (msgId + 1, msgId + 1))
        .Take(msgCount)
        .SelectAsync(actorCount, async msgId => {
            return await Task.WhenAll(actors.Select((actor, i) => {
                var cmd = new WidgetCommand(getActorId(i+1),
                        BatchTestFailMode.None,
                    null);
                return actor.Ask<WidgetCommandResult>(cmd);
            }));
        })
        .SelectMany(results => results)
        .RunWith(Sink.Ignore<WidgetCommandResult>(), Sys.Materializer());

    // Subscribe using AllEvents
    await ((Source<long, NotUsed>)MongoJournalQuery
        .AllEvents(Offset.NoOffset())
        .GroupBy(int.MaxValue, env => ((ICommittedEvent<WidgetEvent>)env.Event).Data.Id)
        .Scan((-1L, ImmutableArray<long>.Empty), (prev, currEnv) => {
            var curr = (ICommittedEvent<WidgetEvent>)currEnv.Event;
            var (prevSeq, prevList) = prev;
            var currSeq = curr.Data.Sequence;
            if (currSeq != prevSeq + 1) {
                throw new Exception($"Received {currSeq} from {toDisplay(curr.Data.Id)}, expecting {string.Join("; ", prevList.TakeLast(5))}; [{prevSeq + 1}] ({currEnv.Offset.ToInt64()})");
            }
            return (prevSeq + 1, prevList.Add(currSeq));
        })
        .Select(t => 0L)
        .MergeSubstreams())
        .Take((msgCount + 1) * actorCount) // Scan produces 1 extra initial message
        .IdleTimeout(TimeSpan.FromSeconds(300))
        .RunWith(Sink.Ignore<long>(), Sys.Materializer());
}

@ptjhuang
Copy link
Contributor

Also noted @jaydeboer reported this once before, and it was suspected this will be fixed by writing in a transaction.

I think if this transaction covered all the AtomicWrite in the batch, rather than just a single AtomicWrite, we can use a read concern with less guarantees.

FYI @Aaronontheweb

@jaydeboer
Copy link
Author

@ptjhuang Thanks for the tip. I have tried out the linearizable read concern and it almost eliminated the missed events in my setup. It took it down from 100+ misses per ~10K events down to 3. The other thing I have noticed is that it appears that events are only skipped on the same node where the event was written. I have another role in the cluster reading the same event streams with the same query configuration and on those nodes, no events seem to missed, no matter what the read concern is.

@Aaronontheweb
Copy link
Member

@jaydeboer @ptjhuang is there something we can do in our defaults for the Akka.Persistence.MongoDb driver to address this internally?

@Aaronontheweb
Copy link
Member

Also, I need to take a look at #318 - my fault; been doing a lot of traveling since late May. Will be done traveling in two weeks.

@jaydeboer
Copy link
Author

@Aaronontheweb there isn't anything I have seen so far, but maybe @ptjhuang has better info than I do.

@ptjhuang
Copy link
Contributor

ptjhuang commented Jul 9, 2023

@jaydeboer

The other thing I have noticed is that it appears that events are only skipped on the same node where the event was written. I have another role in the cluster reading the same event streams with the same query configuration and on those nodes, no events seem to missed, no matter what the read concern is.

This leads me to suspect it's to do with the way AsyncWriteJournal is implemented, where calls to NotifyTagChange trigger reads while other concurrent writes are in progress. This only happens on reads where writes took place on the same node.

https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/dev/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs#L289

What's your thoughts of a ReadWriteSequencer in the MongoDB journal to let concurrent writes finish before reads @Aaronontheweb ?

@Aaronontheweb
Copy link
Member

This leads me to suspect it's to do with the way AsyncWriteJournal is implemented, where calls to NotifyTagChange trigger reads while other concurrent writes are in progress. This only happens on reads where writes took place on the same node.

That's old code that needs to be done away with - I hope we're not still using that to trigger subscriber updates. That should all be done entirely through Db polling.

@Aaronontheweb
Copy link
Member

What's your thoughts of a ReadWriteSequencer in the MongoDB journal to let concurrent writes finish before reads @Aaronontheweb ?

We have an issue similar to this one of the Sql query plugins as well - "missed reads" usually occur only when write volumes are higher and there's some transactions that haven't been fully committed that are included in the current "page" accessed by the query.

A sequencer would probably be a good idea - that's something that either tagged queries or AllEvents will need in order to not suffer from the same issue. I know how we could do this for SQL Server since all of the Ordering values are sequential, but what about for MongoDb?

@Aaronontheweb
Copy link
Member

cc @ptjhuang @jaydeboer

@Arkatufus
Copy link
Contributor

Arkatufus commented Sep 15, 2023

@ptjhuang @jaydeboer @Aaronontheweb
Ok, this is based on the latest code running the performance unit test inside the CI/CD, which is very poorly provisioned (very limited computation resources)

<Edit>
NOTES:
Note that these performance unit test are not running under transactions, ie. the transaction flag were turned off.
</Edit>

[ERROR][09/15/2023 14:05:06.124Z][Thread 0094][akka://MongoDbJournalPerfSpec/user/$a] Rejected to persist event type [Akka.Persistence.TestKit.Performance.Cmd] with sequence number [837] for persistenceId [PersistAsyncPid] due to [The wait queue for acquiring a connection to server 127.0.0.1:27034 is full.].
Cause: MongoDB.Driver.MongoWaitQueueFullException: The wait queue for acquiring a connection to server 127.0.0.1:27034 is full.
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireWaitQueueSlot()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.StartCheckingOut()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Servers.Server.GetChannelAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.InitializeAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.CreateAsync(IWriteBinding binding, Boolean retryRequested, CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.BulkMixedWriteOperation.ExecuteAsync(IWriteBinding binding, CancellationToken cancellationToken)
   at MongoDB.Driver.OperationExecutor.ExecuteWriteOperationAsync[TResult](IWriteBinding binding, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.ExecuteWriteOperationAsync[TResult](IClientSessionHandle session, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass22_0.<<InsertEntries>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 336
--- End of stack trace from previous location ---
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.MaybeWithTransaction(Func`3 act, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 172
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.InsertEntries(IMongoCollection`1 collection, IEnumerable`1 entries, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 337
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass21_0.<<WriteMessagesAsync>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 309
[ERROR][09/15/2023 14:05:06.125Z][Thread 0112][akka://MongoDbJournalPerfSpec/user/$a] Expected to receive [837] yet got: [838]
Cause: System.ArgumentException: Expected to receive [837] yet got: [838]
   at Akka.Persistence.TestKit.Performance.BenchActor.<OnCommand>b__14_2(Cmd d)
   at Akka.Persistence.Eventsourced.<>c__DisplayClass63_0`1.<PersistAsync>b__0(Object o)
   at Akka.Persistence.Eventsourced.PeekApplyHandler(Object payload)
   at Akka.Persistence.Eventsourced.CommonProcessingStateBehavior(Object message, Action`1 onWriteMessageComplete)
   at Akka.Persistence.Eventsourced.<ProcessingCommands>b__95_1(Receive receive, Object message)
   at Akka.Persistence.Eventsourced.AroundReceive(Receive receive, Object message)
   at Akka.Actor.ActorCell.ReceiveMessage(Object message)
   at Akka.Actor.ActorCell.Invoke(Envelope envelope)
[INFO][09/15/2023 14:05:06.167Z][Thread 0076][akka://MongoDbJournalPerfSpec/user/$a] Message [WriteMessageRejected] from [akka://MongoDbJournalPerfSpec/system/testActor200#2035819897] to [akka://MongoDbJournalPerfSpec/user/$a#87312486] was not delivered. [1] dead letters encountered. If this is not an expected behavior then [akka://MongoDbJournalPerfSpec/user/$a#87312486] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Message content: WriteMessageRejected<actorInstanceId: 239, message: Persistent<pid: PersistAsyncPid, seqNr: 839, deleted: False, manifest: , sender: [akka://MongoDbJournalPerfSpec/system/testActor200#2035819897], payload: Akka.Persistence.TestKit.Performance.Cmd, writerGuid: 8de0210d-ab56-4925-a540-4fcbe0ed1385>, cause: MongoDB.Driver.MongoWaitQueueFullException: The wait queue for acquiring a connection to server 127.0.0.1:27034 is full.
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireWaitQueueSlot()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.StartCheckingOut()
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionHelper.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.ConnectionPools.ExclusiveConnectionPool.AcquireConnectionAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Servers.Server.GetChannelAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.InitializeAsync(CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.RetryableWriteContext.CreateAsync(IWriteBinding binding, Boolean retryRequested, CancellationToken cancellationToken)
   at MongoDB.Driver.Core.Operations.BulkMixedWriteOperation.ExecuteAsync(IWriteBinding binding, CancellationToken cancellationToken)
   at MongoDB.Driver.OperationExecutor.ExecuteWriteOperationAsync[TResult](IWriteBinding binding, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.ExecuteWriteOperationAsync[TResult](IClientSessionHandle session, IWriteOperation`1 operation, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.BulkWriteAsync(IClientSessionHandle session, IEnumerable`1 requests, BulkWriteOptions options, CancellationToken cancellationToken)
   at MongoDB.Driver.MongoCollectionImpl`1.UsingImplicitSessionAsync[TResult](Func`2 funcAsync, CancellationToken cancellationToken)
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass22_0.<<InsertEntries>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 336
--- End of stack trace from previous location ---
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.MaybeWithTransaction(Func`3 act, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 172
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.InsertEntries(IMongoCollection`1 collection, IEnumerable`1 entries, CancellationToken token) in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 337
   at Akka.Persistence.MongoDb.Journal.MongoDbJournal.<>c__DisplayClass21_0.<<WriteMessagesAsync>b__0>d.MoveNext() in D:\a\1\s\src\Akka.Persistence.MongoDb\Journal\MongoDbJournal.cs:line 309>

I'm not sure if this is caused by the new transacion codes or not that we've managed to capture these new failure modes, but it seems like the missing sequence numbers were caused by refused connection or connection refusal because the waiting queue for the connection is saturated. This MongoDb waiting queue default behaviour is to drop newest connection attempt when the buffer is full.

I'm not a MongoDb expert, so I don't know if this waiting queue is implemented server side or client side, so I'm still unsure on how to implement the fix, but the correct fix would be a backpressure mechanism to prevent lost message, a retry mechanism would actually make the problem worse because it will hammer the server with even more connection attempts.

@Aaronontheweb
Copy link
Member

Wasn't @jaydeboer 's issue missing reads that had already been successfully written, not the writes themselves? cc @Arkatufus

@Arkatufus
Copy link
Contributor

Arkatufus commented Sep 15, 2023

I'm not sure, was the message actually written @jaydeboer? If they are, then this is a new separate issue.

@jaydeboer
Copy link
Author

@Arkatufus The issue I was having was the events would be written to mongo, but if the query was running on the same node as the actor that journaled the event, some events would not "seen" by the query. However, if the query was running on a different node, all events were seen. Does that help at all?

@Arkatufus
Copy link
Contributor

Arkatufus commented Sep 15, 2023

Yes, there's a big probability that this would be fixed on the latest release, knock on wood

@Arkatufus
Copy link
Contributor

@jaydeboer @ptjhuang

I have confidence that this issue will be resolved by the next release, I'm going to close this issue for now.
Please re-open it if the next release did not fix the issue.

@Arkatufus
Copy link
Contributor

Some clarification on this issue and why I thought that the latest release would fix it:
The MongoDbJournal inherits a piece of code from the old SQLite implementation that, at the time, were considered as the "single source of truth" for building a custom journal. Inside it were a simple pub-sub mechanism that triggers the Persistence.Query event publisher to immediately polls the database for new changes as soon as any events were written to the database by the journal.

The problem we're having with the MongoDbJournal was that it performs all of its write inside an async code. In a write heavy environment, it is possible for an event to be written out of order due to this concurrency.

If writes were done fast enough that two writes were to complete its write operation out of order and the read operation were done in such a way that it disregards data consistency ("local" or "available" read concern, maybe even higher than this), then it is possible that the event publisher to read the out of order write first, skipping a sequence number in the process.

@Arkatufus
Copy link
Contributor

We removed the pub-sub mechanism in the latest release. The event publisher would not try to immediately read database changes as soon as they were written, instead, they will rely on their internal timer to poll the database for any new event changes.

@jaydeboer
Copy link
Author

That sounds like a fix to me! Thanks for all the help!

@jaydeboer
Copy link
Author

jaydeboer commented Mar 21, 2024

I hate to resurrect a closed issue, but the project I was using this was ended up being abandoned. I am not back on a new project and ran into this old friend again. It looks to still be an issue in version 1.5.12.1. I have added the readConcernLevel=linearizable to my connection string and I also had to disable write transactions with use-write-transaction = false in my hocon to get around the issue.

@Aaronontheweb Aaronontheweb reopened this Mar 21, 2024
@Aaronontheweb
Copy link
Member

I also had to disable write transactions with use-write-transaction = false in my hocon to get around the issue.

So that part blows my mind a bit - using transactions actually made this issue worse ?

@jaydeboer
Copy link
Author

The transactions didn't make it worse, I got an error message from MongoDB saying that linearizable is not a valid read concern when using transactions. I tried other read concern levels and the issue would still happen sometimes. Going to the linearizable, and disabling transactions to please Mongo, made it so I am not able to reproduce the issue.

@Aaronontheweb
Copy link
Member

Ah, transactions on read is what it was complaining about?

@jaydeboer
Copy link
Author

Recovery failed for <ACTOR> with error Command find failed: The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in order to run in a transaction.
      Cause: MongoDB.Driver.MongoCommandException: Command find failed: The readConcern level must be either 'local' (default), 'majority' or 'snapshot' in order to run in a transaction.
         at MongoDB.Driver.Core.WireProtocol.CommandUsingCommandMessageWireProtocol`1.ProcessResponse(ConnectionId connectionId, CommandMessage responseMessage)

Is the error. It looks like it is on the read side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants