Skip to content

Commit

Permalink
perf: improve deletion, avoid multiple rows updating.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Apr 19, 2024
1 parent 257341d commit 93e83af
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.markJournalMessagesAsDeleted")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.markJournalMessagesAsDeleted")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.highestMarkedSequenceNrForPersistenceId")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.highestMarkedSequenceNrForPersistenceId")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.selectByPersistenceIdAndMaxSequenceNumber")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.selectByPersistenceIdAndMaxSequenceNumber")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.allPersistenceIdsDistinct")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.allPersistenceIdsDistinct")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.JournalQueries.journalRowByPersistenceIds")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.journal.dao.legacy.JournalQueries.journalRowByPersistenceIds")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.ReadJournalQueries.journalRowByPersistenceIds")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.jdbc.query.dao.legacy.ReadJournalQueries.journalRowByPersistenceIds")
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class DefaultJournalDao(

override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = {
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for {
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId)
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1)
highestSequenceNr <- queries.highestSequenceNrForPersistenceIdBefore((persistenceId, maxSequenceNr)).result
_ <- queries.delete(persistenceId, highestSequenceNr - 1)
_ <- queries.markJournalMessageAsDeleted(persistenceId, highestSequenceNr)
} yield ()

db.run(actions.transactionally)
Expand All @@ -64,9 +64,6 @@ class DefaultJournalDao(
} yield maybeHighestSeqNo.getOrElse(0L)
}

private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {

def serializeAtomicWrite(aw: AtomicWrite): Try[Seq[(JournalAkkaSerializationRow, Set[String])]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class JournalQueries(
private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering))
private val TagTableC = Compiled(TagTable)

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)
val highestSequenceNrForPersistenceIdBefore = Compiled(_highestSequenceNrForPersistenceIdBefore _)
val messagesQuery = Compiled(_messagesQuery _)

def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(
implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
val sorted = xs.sortBy(event => event._1.sequenceNumber)
Expand Down Expand Up @@ -59,49 +63,33 @@ class JournalQueries(
}
}

private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) =
selectAllJournalForPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc)

private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
JournalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)

def delete(persistenceId: String, toSequenceNr: Long) = {
JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
}

def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) =
def markJournalMessageAsDeleted(persistenceId: String, sequenceNr: Long) =
JournalTable
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber <= maxSequenceNr)
.filter(_.sequenceNumber === sequenceNr)
.filter(_.deleted === false)
.map(_.deleted)
.update(true)

private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max

private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).take(1).map(_.sequenceNumber).max

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)

private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
selectAllJournalForPersistenceIdDesc(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)

val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)

private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
JournalTable.map(_.persistenceId).distinct

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)

def journalRowByPersistenceIds(persistenceIds: Iterable[String]): Query[Rep[String], String, Seq] =
for {
query <- JournalTable.map(_.persistenceId)
if query.inSetBind(persistenceIds)
} yield query
private def _highestSequenceNrForPersistenceIdBefore(
persistenceId: Rep[String],
maxSequenceNr: Rep[Long]): Rep[Long] =
selectAllJournalForPersistenceId(persistenceId)
.filter(_.sequenceNumber <= maxSequenceNr)
.take(1)
.map(_.sequenceNumber)
.max
.getOrElse(0L)

private def _messagesQuery(
persistenceId: Rep[String],
Expand All @@ -116,6 +104,4 @@ class JournalQueries(
.sortBy(_.sequenceNumber.asc)
.take(max)

val messagesQuery = Compiled(_messagesQuery _)

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ trait BaseByteArrayJournalDao
// We should keep journal record with highest sequence number in order to be compliant
// with @see [[akka.persistence.journal.JournalSpec]]
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for {
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId)
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1)
highestSequenceNr <- queries.highestSequenceNrForPersistenceIdBefore((persistenceId, maxSequenceNr)).result
_ <- queries.delete(persistenceId, highestSequenceNr - 1)
_ <- queries.markJournalMessageAsDeleted(persistenceId, highestSequenceNr)
} yield ()

db.run(actions.transactionally)
Expand All @@ -101,9 +101,6 @@ trait BaseByteArrayJournalDao
db.run(queries.update(persistenceId, sequenceNr, serializedRow.message).map(_ => Done))
}

private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
for {
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg

private val JournalTableC = Compiled(JournalTable)

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)
val highestSequenceNrForPersistenceIdBefore = Compiled(_highestSequenceNrForPersistenceIdBefore _)
val messagesQuery = Compiled(_messagesQuery _)

def writeJournalRows(xs: Seq[JournalRow]) =
JournalTableC ++= xs.sortBy(_.sequenceNumber)

private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) =
selectAllJournalForPersistenceId(persistenceId).sortBy(_.sequenceNumber.desc)

private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
JournalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)

Expand All @@ -38,39 +39,26 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg
baseQuery.map(_.message).update(replacement)
}

def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) =
def markJournalMessageAsDeleted(persistenceId: String, sequenceNr: Long) =
JournalTable
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber <= maxSequenceNr)
.filter(_.sequenceNumber === sequenceNr)
.filter(_.deleted === false)
.map(_.deleted)
.update(true)

private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
selectAllJournalForPersistenceId(persistenceId).take(1).map(_.sequenceNumber).max

private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).take(1).map(_.sequenceNumber).max

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)

private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
selectAllJournalForPersistenceIdDesc(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)

val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _)

private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] =
JournalTable.map(_.persistenceId).distinct

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)

def journalRowByPersistenceIds(persistenceIds: Iterable[String]): Query[Rep[String], String, Seq] =
for {
query <- JournalTable.map(_.persistenceId)
if query.inSetBind(persistenceIds)
} yield query
private def _highestSequenceNrForPersistenceIdBefore(
persistenceId: Rep[String],
maxSequenceNr: Rep[Long]): Rep[Long] =
selectAllJournalForPersistenceId(persistenceId)
.filter(_.sequenceNumber <= maxSequenceNr)
.take(1)
.map(_.sequenceNumber)
.max
.getOrElse(0L)

private def _messagesQuery(
persistenceId: Rep[String],
Expand All @@ -84,7 +72,4 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Leg
.filter(_.sequenceNumber <= toSequenceNr)
.sortBy(_.sequenceNumber.asc)
.take(max)

val messagesQuery = Compiled(_messagesQuery _)

}

0 comments on commit 93e83af

Please sign in to comment.