Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle events out of timestamp order #706

Open
chbatey opened this issue Feb 25, 2020 · 12 comments
Open

Handle events out of timestamp order #706

chbatey opened this issue Feb 25, 2020 · 12 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:events-by-tag

Comments

@chbatey
Copy link
Member

chbatey commented Feb 25, 2020

Cassandra and this plugin rely on clocks to be in sync otherwise the following can happen:

  • Actor persists event E1 @ time T1 on N1
  • Actor is stopped
  • Actor is started on N2 with a clock being N1 by > the time since E1
  • Actor persists E2 @ Time T2 on N2

T2 can be < T1

Events by tag relies on the timestamp UUID always increasing so that restarting from an offset
won't miss events.

Events by tag also enforces that events for the same actor are delivered in seqNr order. This may or not be important for processing done via events tag.

In this scenario, both can not be achieved. Either the events are delivered out of timeuuid order or seqNr order.

Right now TagWriter will fail to persist the event as it fails to generate a tag pid sequence nr as it sees the events out of seqNr order when being sent from the journal.

On a single node the UUIDs utility we use to generate type 1 UUIDs ensures that even if a clock goes backwards the time uuids only ever increase.

We could:

  1. Fail more gracefully, saying that events for this persistent id won't appear in the events by tag queries until the data is manually fixed. Fixing the data isn't trivial as fake timestamps would need to be inserted
  2. Relax the events should be delivered in order of seqNr for a given persistenceId in the evnets by ag query
  3. Relax the events should be delivered in timeuuid order
  4. The plugin could generate a different timeuud for the events by tag query rather than use the one in the messages table writes to the tag_views table should be idempotent based on data in the messages table

This is a very rare edge case so I'm reluctant to add a lot of complexity for it. The simplest solution would be to add a flag to allow these events to be put in the tag_views table but then not guarantee events are delivered in seqNr order

@ffakenz
Copy link

ffakenz commented Mar 24, 2020

Guys I'm experienced the same issue with TagWritter about "Expected events to be ordered by seqNr".
I would like my actor persistent implementation to persist multiple events using the same tag, but this seems to be the source of my problem, because if I change the tag on every event then the seqNr is mantained.
One last thing to mention, if I start the service only in one node the problem is solved, but anyway I need it to scale.
What do you recommend me to do ?
Please I need you to push this issue as it is a main blocker for my application, as all my actors in all my services are tight to actor persistence and moving the solution means a cost we cannot afford at this moment.
Looking forward to hearing from you.

FYI > I'm using cassandra as the journal and the akka-persistence plugin for it

@patriknw
Copy link
Member

As stated in the first sentence of this ticket "Cassandra and this plugin rely on clocks to be in sync". This is something needed by Cassandra itself also. Clocks can never be perfectly in sync but within something like a second shouldn't be problem. Then the question is if it's possible to fail over persistent actor from one node to another within such short clock skew?

@ffakenz When do you see this this problem? Is it in rolling update scenarios? Do you use Cluster Sharding? Which version of Akka? Which version of the Cassandra plugin?

Please I need you to push this issue as it is a main blocker for my application

A friendly remainder that this is an open-source project and that kind of demand can't be requested here. A more friendly tone would be appreciated.

We try to do our best to help all community users, but if you need full professional support or immediate priority of critical issues you should become a Lightbend customer.

@ffakenz
Copy link

ffakenz commented Mar 25, 2020

Hey Patrik, thank you so much for your reply.
I'm sorry about the tone, plz do not get me wrong, It was not my intention to not sound friendly.
About the questions you asked:

  • When do you see this this problem? Is it in rolling update scenarios? NO, any time while running
  • Do you use Cluster Sharding? YES, the actors that are showing problems are sharded.
  • Which version of Akka? 2.6.1
  • Which version of the Cassandra plugin? 0.101

@patriknw
Copy link
Member

When do you see this this problem? Is it in rolling update scenarios? NO, any time while running

So you are saying that several different actors persist events with the same tag and then some of them are not seen by the eventsByTag query? Is it stuck or are the events just not showing up? Is the eventsByTag query restarted when this happens?

Would be great if you can describe some more about how to trigger the problematic scenario.

@ffakenz
Copy link

ffakenz commented Mar 25, 2020

The scenario is the following:

  • The application starts and fires up an actor sharded.
  • The application consumes from kafka some messages and transforms them into commands in order to send them to their corresponding actors using cluster sharding.
  • The actor, on receive command will persist a Tagged event on cassandra
  • The actor receives the command and persist the Tagged event successfully !!
  • After some messages being consumed the following error araises from TagWriter
    Expected events to be ordered by seqNr
  • As mentioned events are persisted successfully into akka.messages but not all of them exists in akka.tag_views

@patriknw
Copy link
Member

The "Expected events to be ordered by seqNr" message also contains the persistenceId that it is having trouble with. Can you see if that corresponding actor was moved from one node to another by Cluster Sharding (rebalance)?

It would be interesting if you can share the events in the messages table for such persistence_id.

The messages table has a writer_uuid so we could see if different actors were writing the events.

select persistence_id, sequence_nr, toTimestamp(timestamp), timebucket, tags, writer_uuid from akka.messages where partition_nr=0 and persistence_id = '<the persistenceId>';

BTW, do you use clock synchronization of the nodes, with NTP or similar?

@ffakenz
Copy link

ffakenz commented Apr 1, 2020

Patrick, thanks for the quick reply and my apologize for the delay in my response, but I wanted to create a minimal project to reproduce the bug (https://github.com/ffakenz/Example/)

The problem is not occurring during Cluster Sharding (rebalance).

On the other hand, we are not using any clock synchronization of the nodes nor NTP or similar.

The issue happens when you have the following combination:

  • nesting actors (childs) where messages are forwarded and responses are reply back to parent.
  • using persistAll when processing a command in parent
  • process at least one different command in parent using a simple persist
  • using tagged events (with the same tag for all of them)

I was able to overcome this issue by avoiding the use of persistAll (in the provided example there is only one event, but imagine my real prod code does try to persist multiple)

Looking forward to your comments, and I hope we can get some more insights about this :)

Once again thanks a lot for the help and support !

@patriknw
Copy link
Member

patriknw commented Apr 2, 2020

Thanks for taking the time to create an example. I will take a look at that tomorrow.

@patriknw
Copy link
Member

patriknw commented Apr 3, 2020

@ffakenz Thanks for a very well structured example. I have found that the problem is in your code. Two PersonActors are started with the same persistenceId.

If you add a log in the constructor of PersonActor

scribe.info(s"PersonActor started with pid [$persistenceId] at path [${self.path}]")

you will see

2020.04.03 07:59:05 [INFO] model.person.PersonActor:51:14 - PersonActor started with pid [PersonActor-1] at path [akka://ClusterExample/system/sharding/PersonActor/2/1]
2020.04.03 07:59:05 [INFO] model.person.PersonActor:51:14 - PersonActor started with pid [PersonActor-1] at path [akka://ClusterExample/system/sharding/PersonActor/1/1]

That is also detected by the plugin with this warning:

[2020-04-03 07:59:06,471] [WARN] [akka.persistence.cassandra.journal.TagWriters] [] [ClusterExample-akka.actor.default-dispatcher-17] - Persistent actor starting for pid [PersonActor-1]. Old ref hasn't terminated yet: [Actor[akka://ClusterExample/system/sharding/PersonActor/2/1#-1473043936]]. Persistent Actors with the same PersistenceId should not run concurrently MDC: {akkaAddress=akka://[email protected]:2551, sourceThread=ClusterExample-akka.persistence.cassandra.default-dispatcher-24, akkaSource=akka://[email protected]:2551/system/akka.persistence.cassandra.journal/tagWrites, sourceActorSystem=ClusterExample, akkaTimestamp=05:59:06.470UTC}

The problem is in your entityId/shardedId in the messages. You must make sure that a given entityId always resolves to the same value in extractShardId. Easiest would be to use s.entityId.hashCode there instead of that shardedId.

On a related note you should also add abs, since hashCode can be negative:

math.abs(s.entityId.hashCode % numberOfShards).toString

@ffakenz
Copy link

ffakenz commented Apr 6, 2020

Patrick, thank you so much for taking the time and the patience to run the code !
You are a beast !
Can not believe we missed that warning with the team :(
I'm apologize for the inconvenience.
Now we need to pay the price of fixing this all over the place xD
Have a great week man and best regards to the akka team. You guys rock !

@patriknw
Copy link
Member

patriknw commented Apr 6, 2020

You're welcome!

@chbatey chbatey added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:events-by-tag labels Apr 14, 2020
@nolah
Copy link

nolah commented Jan 11, 2022

Hi Akka team!

I would like to leave a ping on this issue as our team has faced it lately.

We are using cassandra persistence.

Oddly, there is a part of documentation that describes how to circumvent this issue, and we have it configured to repair-by-discard-old, but this configuration is only applied to ReplayFilter, IllegalStateException is still thrown by the akka.persistence.cassandra.journal.TagWriter for same problem with events out of sequence.

Is there any chance akka.persistence.journal.xxx.replay-filter.mode config gets applied to akka.persistence.cassandra.journal.TagWritter event processing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:events-by-tag
Projects
None yet
Development

No branches or pull requests

4 participants