Skip to content

Commit

Permalink
Synchronous write to tag views table (#841)
Browse files Browse the repository at this point in the history
* Synchronously write tag writes

* Formatting

* More effecient rebuild

* Review feedback

* Remove one lasst log

* More review feedback

* formatting
  • Loading branch information
chbatey authored Nov 6, 2020
1 parent 5aaea47 commit 71ae933
Show file tree
Hide file tree
Showing 9 changed files with 636 additions and 233 deletions.
15 changes: 12 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,18 @@ akka.persistence.cassandra {
# Max time to buffer events for before writing.
# Larger values will increase cassandra write efficiency but increase the delay before
# seeing events in EventsByTag queries.
# Setting this to 0 means that tag writes will be written immediately but will still be asynchronous
# with respect to the PersistentActor's persist call.
flush-interval = 250ms
# Setting this to 0 means that tag writes will be written immediately. Batching will still happen
# as events are buffered while a write is in progress
flush-interval = 0ms

# Tagged events are written to a separate table after the write to the messages table has completed.
# If the write to the tag_views table fails it is retried. If it hasn't succeeded within this timeout
# then the actor will be stopped and the write will be retried again to the tag_views table when the actor
# is restarted
# A default of 4 seconds as that is greater than a typical write timeout in C* (2 seconds) and less than
# the default eventual consistency delay
# This behavior is new in 1.0.4 where before the write to the tag_views was completely asynchronous.
tag-write-timeout = 4s

# Update the tag_scanning table with this interval. Shouldn't be done too often to
# avoid unecessary load. The tag_scanning table keeps track of a starting point for tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ import com.typesafe.config.Config
case _ => eventsByTagConfig.getDuration("pubsub-notification", TimeUnit.MILLISECONDS).millis
}

val tagWriteTimeout = eventsByTagConfig.getDuration("tag-write-timeout", TimeUnit.MILLISECONDS).millis

val tagWriterSettings = TagWriterSettings(
eventsByTagConfig.getInt("max-message-batch-size"),
eventsByTagConfig.getDuration("flush-interval", TimeUnit.MILLISECONDS).millis,
Expand Down
129 changes: 129 additions & 0 deletions core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.cassandra.journal

import akka.annotation.InternalApi
import akka.persistence.cassandra.journal.TagWriter.{ timeUuidOrdering, AwaitingWrite }
import akka.util.{ OptionVal, UUIDComparator }

/**
* INTERNAL API
*
* Buffered events waiting to be written.
* The next batch is maintained in `nextBatch` and will never contain more than the `batchSize`
* or events from different time buckets.
*
* Events should be added and then call shouldWrite() to see if a batch is ready to be written.
* Once a write is complete call `writeComplete` to discard the events in `nextBatch` and take
* events from `pending` for the next batch.
*/
@InternalApi
private[akka] case class Buffer(
batchSize: Int,
size: Int,
nextBatch: Vector[AwaitingWrite],
pending: Vector[AwaitingWrite],
writeRequired: Boolean) {
require(batchSize > 0)

def isEmpty: Boolean = nextBatch.isEmpty

def nonEmpty: Boolean = nextBatch.nonEmpty

def remove(pid: String): Buffer = {
val (toFilter, without) = nextBatch.partition(_.events.head._1.persistenceId == pid)
val filteredPending = pending.filterNot(_.events.head._1.persistenceId == pid)
val removed = toFilter.foldLeft(0)((acc, next) => acc + next.events.size)
copy(size = size - removed, nextBatch = without, pending = filteredPending)
}

/**
* Any time a new time bucket is received or the max batch size is reached then
* a write should happen
*/
def shouldWrite(): Boolean = {
if (!writeRequired)
require(size <= batchSize)
writeRequired
}

final def add(write: AwaitingWrite): Buffer = {
val firstTimeBucket = write.events.head._1.timeBucket
val lastTimeBucket = write.events.last._1.timeBucket
if (firstTimeBucket != lastTimeBucket) {
// this write needs broken up as it spans multiple time buckets
val (first, rest) = write.events.partition {
case (serialized, _) => serialized.timeBucket == firstTimeBucket
}
add(AwaitingWrite(first, OptionVal.None)).add(AwaitingWrite(rest, write.ack))
} else {
// common case
val newSize = size + write.events.size
if (writeRequired) {
// add them to pending, any time bucket changes will be detected later
copy(size = newSize, pending = pending :+ write)
} else if (nextBatch.headOption.exists(oldestEvent =>
UUIDComparator.comparator
.compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
// rare case where events have been received out of order, just re-build the buffer
require(pending.isEmpty)
val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering)
rebuild(allWrites)
} else if (nextBatch.headOption.exists(_.events.head._1.timeBucket != write.events.head._1.timeBucket)) {
// time bucket has changed
copy(size = newSize, pending = pending :+ write, writeRequired = true)
} else if (newSize >= batchSize) {
require(pending.isEmpty, "Pending should be empty if write not required")
// does the new write need broken up?
if (newSize > batchSize) {
val toAdd = batchSize - size
val (forNextWrite, forPending) = write.events.splitAt(toAdd)
copy(
size = newSize,
nextBatch = nextBatch :+ AwaitingWrite(forNextWrite, OptionVal.None),
pending = Vector(AwaitingWrite(forPending, write.ack)),
writeRequired = true)
} else {
copy(size = newSize, nextBatch = nextBatch :+ write, writeRequired = true)
}
} else {
copy(size = size + write.events.size, nextBatch = nextBatch :+ write)
}
}
}

private def rebuild(writes: Vector[AwaitingWrite]): Buffer = {
var buffer = Buffer.empty(batchSize)
var i = 0
while (!buffer.shouldWrite() && i < writes.size) {
buffer = buffer.add(writes(i))
i += 1
}
// pending may have one in it as the last one may have been a time bucket change rather than bach full
val done = buffer.copy(pending = buffer.pending ++ writes.drop(i))
done
}

final def addPending(write: AwaitingWrite): Buffer = {
copy(size = size + write.events.size, pending = pending :+ write)
}

def writeComplete(): Buffer = {
// this could be more efficient by adding until a write is required but this is simpler and
// pending is expected to be small unless the database is falling behind
rebuild(pending)
}
}

/**
* INTERNAL API
*/
@InternalApi
private[akka] object Buffer {
def empty(batchSize: Int): Buffer = {
require(batchSize > 0)
Buffer(batchSize, 0, Vector.empty, Vector.empty, writeRequired = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.annotation.InternalApi
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern.pipe
import akka.pattern.{ ask, pipe }
import akka.persistence._
import akka.persistence.cassandra._
import akka.persistence.cassandra.Extractors
Expand All @@ -26,7 +26,7 @@ import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtensi
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.Sink
import akka.dispatch.ExecutionContexts
import akka.util.OptionVal
import akka.util.{ OptionVal, Timeout }
import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config
import com.datastax.oss.driver.api.core.uuid.Uuids
Expand Down Expand Up @@ -94,7 +94,6 @@ import akka.stream.scaladsl.Source
settings.journalSettings.readProfile,
taggedPreparedStatements)

// TODO move all tag related things into a class that no-ops to remove these options
private val tagWrites: Option[ActorRef] =
if (settings.eventsByTagSettings.eventsByTagEnabled)
Some(
Expand Down Expand Up @@ -217,11 +216,11 @@ import akka.stream.scaladsl.Source
result
.flatMap(_ => deleteDeletedToSeqNr(persistenceId))
.flatMap(_ => deleteFromAllPersistenceIds(persistenceId))
else result.map(_ => Done)
else result.map(_ => Done)(ExecutionContexts.parasitic)
result2.pipeTo(sender())

case HealthCheckQuery =>
session.selectOne(healthCheckCql).map(_ => HealthCheckResponse).pipeTo(sender)
session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContexts.parasitic).pipeTo(sender)
}

override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
Expand Down Expand Up @@ -280,9 +279,16 @@ import akka.stream.scaladsl.Source

rec(groups)
}
result.map { _ =>
tagWrites.foreach(_ ! extractTagWrites(serialized))
Nil

// The tag writer keeps retrying but will drop writes for a persistent actor when it restarts
// due to this failing
result.flatMap { _ =>
tagWrites match {
case Some(t) =>
implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout)
t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic)
case None => Future.successful(Nil)
}
}

}
Expand Down Expand Up @@ -538,7 +544,7 @@ import akka.stream.scaladsl.Source
e.getClass.getName,
e.getMessage)
}
deleteResult.map(_ => Done)
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
}
}
}
Expand Down Expand Up @@ -578,7 +584,7 @@ import akka.stream.scaladsl.Source
}
})
})))
deleteResult.map(_ => Done)
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
}

// Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows.
Expand Down Expand Up @@ -836,7 +842,10 @@ import akka.stream.scaladsl.Source
writerUuid: String,
meta: Option[SerializedMeta],
timeUuid: UUID,
timeBucket: TimeBucket)
timeBucket: TimeBucket) {
// never log serialized byte buffer
override def toString: PersistenceId = s"Serialized($persistenceId, $sequenceNr, $timeBucket)"
}

private[akka] case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int)

Expand Down
Loading

0 comments on commit 71ae933

Please sign in to comment.