diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index dce08d781..2abd09229 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -252,9 +252,18 @@ akka.persistence.cassandra { # Max time to buffer events for before writing. # Larger values will increase cassandra write efficiency but increase the delay before # seeing events in EventsByTag queries. - # Setting this to 0 means that tag writes will be written immediately but will still be asynchronous - # with respect to the PersistentActor's persist call. - flush-interval = 250ms + # Setting this to 0 means that tag writes will be written immediately. Batching will still happen + # as events are buffered while a write is in progress + flush-interval = 0ms + + # Tagged events are written to a separate table after the write to the messages table has completed. + # If the write to the tag_views table fails it is retried. If it hasn't succeeded within this timeout + # then the actor will be stopped and the write will be retried again to the tag_views table when the actor + # is restarted + # A default of 4 seconds as that is greater than a typical write timeout in C* (2 seconds) and less than + # the default eventual consistency delay + # This behavior is new in 1.0.4 where before the write to the tag_views was completely asynchronous. + tag-write-timeout = 4s # Update the tag_scanning table with this interval. Shouldn't be done too often to # avoid unecessary load. The tag_scanning table keeps track of a starting point for tag diff --git a/core/src/main/scala/akka/persistence/cassandra/EventsByTagSettings.scala b/core/src/main/scala/akka/persistence/cassandra/EventsByTagSettings.scala index 3d8e54aff..27b59bb09 100644 --- a/core/src/main/scala/akka/persistence/cassandra/EventsByTagSettings.scala +++ b/core/src/main/scala/akka/persistence/cassandra/EventsByTagSettings.scala @@ -141,6 +141,8 @@ import com.typesafe.config.Config case _ => eventsByTagConfig.getDuration("pubsub-notification", TimeUnit.MILLISECONDS).millis } + val tagWriteTimeout = eventsByTagConfig.getDuration("tag-write-timeout", TimeUnit.MILLISECONDS).millis + val tagWriterSettings = TagWriterSettings( eventsByTagConfig.getInt("max-message-batch-size"), eventsByTagConfig.getDuration("flush-interval", TimeUnit.MILLISECONDS).millis, diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala new file mode 100644 index 000000000..0e8d2c831 --- /dev/null +++ b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.persistence.cassandra.journal + +import akka.annotation.InternalApi +import akka.persistence.cassandra.journal.TagWriter.{ timeUuidOrdering, AwaitingWrite } +import akka.util.{ OptionVal, UUIDComparator } + +/** + * INTERNAL API + * + * Buffered events waiting to be written. + * The next batch is maintained in `nextBatch` and will never contain more than the `batchSize` + * or events from different time buckets. + * + * Events should be added and then call shouldWrite() to see if a batch is ready to be written. + * Once a write is complete call `writeComplete` to discard the events in `nextBatch` and take + * events from `pending` for the next batch. + */ +@InternalApi +private[akka] case class Buffer( + batchSize: Int, + size: Int, + nextBatch: Vector[AwaitingWrite], + pending: Vector[AwaitingWrite], + writeRequired: Boolean) { + require(batchSize > 0) + + def isEmpty: Boolean = nextBatch.isEmpty + + def nonEmpty: Boolean = nextBatch.nonEmpty + + def remove(pid: String): Buffer = { + val (toFilter, without) = nextBatch.partition(_.events.head._1.persistenceId == pid) + val filteredPending = pending.filterNot(_.events.head._1.persistenceId == pid) + val removed = toFilter.foldLeft(0)((acc, next) => acc + next.events.size) + copy(size = size - removed, nextBatch = without, pending = filteredPending) + } + + /** + * Any time a new time bucket is received or the max batch size is reached then + * a write should happen + */ + def shouldWrite(): Boolean = { + if (!writeRequired) + require(size <= batchSize) + writeRequired + } + + final def add(write: AwaitingWrite): Buffer = { + val firstTimeBucket = write.events.head._1.timeBucket + val lastTimeBucket = write.events.last._1.timeBucket + if (firstTimeBucket != lastTimeBucket) { + // this write needs broken up as it spans multiple time buckets + val (first, rest) = write.events.partition { + case (serialized, _) => serialized.timeBucket == firstTimeBucket + } + add(AwaitingWrite(first, OptionVal.None)).add(AwaitingWrite(rest, write.ack)) + } else { + // common case + val newSize = size + write.events.size + if (writeRequired) { + // add them to pending, any time bucket changes will be detected later + copy(size = newSize, pending = pending :+ write) + } else if (nextBatch.headOption.exists(oldestEvent => + UUIDComparator.comparator + .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) { + // rare case where events have been received out of order, just re-build the buffer + require(pending.isEmpty) + val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering) + rebuild(allWrites) + } else if (nextBatch.headOption.exists(_.events.head._1.timeBucket != write.events.head._1.timeBucket)) { + // time bucket has changed + copy(size = newSize, pending = pending :+ write, writeRequired = true) + } else if (newSize >= batchSize) { + require(pending.isEmpty, "Pending should be empty if write not required") + // does the new write need broken up? + if (newSize > batchSize) { + val toAdd = batchSize - size + val (forNextWrite, forPending) = write.events.splitAt(toAdd) + copy( + size = newSize, + nextBatch = nextBatch :+ AwaitingWrite(forNextWrite, OptionVal.None), + pending = Vector(AwaitingWrite(forPending, write.ack)), + writeRequired = true) + } else { + copy(size = newSize, nextBatch = nextBatch :+ write, writeRequired = true) + } + } else { + copy(size = size + write.events.size, nextBatch = nextBatch :+ write) + } + } + } + + private def rebuild(writes: Vector[AwaitingWrite]): Buffer = { + var buffer = Buffer.empty(batchSize) + var i = 0 + while (!buffer.shouldWrite() && i < writes.size) { + buffer = buffer.add(writes(i)) + i += 1 + } + // pending may have one in it as the last one may have been a time bucket change rather than bach full + val done = buffer.copy(pending = buffer.pending ++ writes.drop(i)) + done + } + + final def addPending(write: AwaitingWrite): Buffer = { + copy(size = size + write.events.size, pending = pending :+ write) + } + + def writeComplete(): Buffer = { + // this could be more efficient by adding until a write is required but this is simpler and + // pending is expected to be small unless the database is falling behind + rebuild(pending) + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object Buffer { + def empty(batchSize: Int): Buffer = { + require(batchSize > 0) + Buffer(batchSize, 0, Vector.empty, Vector.empty, writeRequired = false) + } +} diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala index 12e0a405b..128c458ee 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala @@ -13,7 +13,7 @@ import akka.actor.SupervisorStrategy.Stop import akka.actor._ import akka.annotation.InternalApi import akka.event.{ Logging, LoggingAdapter } -import akka.pattern.pipe +import akka.pattern.{ ask, pipe } import akka.persistence._ import akka.persistence.cassandra._ import akka.persistence.cassandra.Extractors @@ -26,7 +26,7 @@ import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtensi import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } import akka.stream.scaladsl.Sink import akka.dispatch.ExecutionContexts -import akka.util.OptionVal +import akka.util.{ OptionVal, Timeout } import com.datastax.oss.driver.api.core.cql._ import com.typesafe.config.Config import com.datastax.oss.driver.api.core.uuid.Uuids @@ -94,7 +94,6 @@ import akka.stream.scaladsl.Source settings.journalSettings.readProfile, taggedPreparedStatements) - // TODO move all tag related things into a class that no-ops to remove these options private val tagWrites: Option[ActorRef] = if (settings.eventsByTagSettings.eventsByTagEnabled) Some( @@ -217,11 +216,11 @@ import akka.stream.scaladsl.Source result .flatMap(_ => deleteDeletedToSeqNr(persistenceId)) .flatMap(_ => deleteFromAllPersistenceIds(persistenceId)) - else result.map(_ => Done) + else result.map(_ => Done)(ExecutionContexts.parasitic) result2.pipeTo(sender()) case HealthCheckQuery => - session.selectOne(healthCheckCql).map(_ => HealthCheckResponse).pipeTo(sender) + session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContexts.parasitic).pipeTo(sender) } override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = { @@ -280,9 +279,16 @@ import akka.stream.scaladsl.Source rec(groups) } - result.map { _ => - tagWrites.foreach(_ ! extractTagWrites(serialized)) - Nil + + // The tag writer keeps retrying but will drop writes for a persistent actor when it restarts + // due to this failing + result.flatMap { _ => + tagWrites match { + case Some(t) => + implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout) + t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic) + case None => Future.successful(Nil) + } } } @@ -538,7 +544,7 @@ import akka.stream.scaladsl.Source e.getClass.getName, e.getMessage) } - deleteResult.map(_ => Done) + deleteResult.map(_ => Done)(ExecutionContexts.parasitic) } } } @@ -578,7 +584,7 @@ import akka.stream.scaladsl.Source } }) }))) - deleteResult.map(_ => Done) + deleteResult.map(_ => Done)(ExecutionContexts.parasitic) } // Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows. @@ -836,7 +842,10 @@ import akka.stream.scaladsl.Source writerUuid: String, meta: Option[SerializedMeta], timeUuid: UUID, - timeBucket: TimeBucket) + timeBucket: TimeBucket) { + // never log serialized byte buffer + override def toString: PersistenceId = s"Serialized($persistenceId, $sequenceNr, $timeBucket)" + } private[akka] case class SerializedMeta(serialized: ByteBuffer, serManifest: String, serId: Int) diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala index 8d9c306d9..c74630a69 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala @@ -6,21 +6,21 @@ package akka.persistence.cassandra.journal import java.util.UUID -import akka.actor.{ Actor, ActorLogging, ActorRef, NoSerializationVerificationNeeded, Props, Timers } +import akka.Done +import akka.actor.{ Actor, ActorLogging, ActorRef, NoSerializationVerificationNeeded, Props, ReceiveTimeout, Timers } import akka.annotation.InternalApi import akka.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator } +import akka.event.LoggingAdapter import akka.pattern.pipe import akka.persistence.cassandra.formatOffset import akka.persistence.cassandra.journal.CassandraJournal._ import akka.persistence.cassandra.journal.TagWriter.TagWriterSettings import akka.persistence.cassandra.journal.TagWriters.TagWritersSession +import akka.util.{ OptionVal, UUIDComparator } -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.{ Duration, FiniteDuration, _ } import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import scala.concurrent.duration._ -import akka.actor.ReceiveTimeout -import akka.util.UUIDComparator /* * INTERNAL API @@ -83,15 +83,22 @@ import akka.util.UUIDComparator private case object FlushKey sealed trait TagWriteFinished final case class TagWriteDone(summary: TagWriteSummary, doneNotify: Option[ActorRef]) extends TagWriteFinished - private final case class TagWriteFailed(reason: Throwable, failedEvents: Vector[(Serialized, TagPidSequenceNr)]) + private final case class TagWriteFailed(reason: Throwable, failedEvents: Vector[AwaitingWrite]) extends TagWriteFinished private[akka] case class DropState(pid: PersistenceId) - val timeUuidOrdering = new Ordering[UUID] { + val timeUuidOrdering: Ordering[UUID] = new Ordering[UUID] { override def compare(x: UUID, y: UUID) = UUIDComparator.comparator.compare(x, y) } + + /** + * The only reason ack is None is if a TagWrite needed to be broken up because the events were + * from different time buckets or if a single write exceeds the batch size. + * In that case the later AwaitingWrite contains the ack. + */ + case class AwaitingWrite(events: Seq[(Serialized, TagPidSequenceNr)], ack: OptionVal[ActorRef]) } /** INTERNAL API */ @@ -107,11 +114,10 @@ import akka.util.UUIDComparator import TagWriter._ import TagWriters.TagWrite - import context.become - import context.dispatcher + import context.{ become, dispatcher } // eager init and val because used from Future callbacks - override val log = super.log + override val log: LoggingAdapter = super.log private val pubsub: Option[ActorRef] = { settings.pubsubNotification match { @@ -134,40 +140,37 @@ import akka.util.UUIDComparator } override def receive: Receive = - idle(Vector.empty[(Serialized, TagPidSequenceNr)], Map.empty[String, Long]) + idle(Buffer.empty(settings.maxBatchSize), Map.empty[String, Long]) - private def idle( - buffer: Vector[(Serialized, TagPidSequenceNr)], - tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr]): Receive = { + private def idle(buffer: Buffer, tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr]): Receive = { case DropState(pid) => log.debug("Dropping state for pid: {}", pid) - context.become(idle(buffer, tagPidSequenceNrs - pid)) + context.become(idle(buffer.remove(pid), tagPidSequenceNrs - pid)) case InternalFlush => log.debug("Flushing") if (buffer.nonEmpty) { - write(buffer.take(settings.maxBatchSize), buffer.drop(settings.maxBatchSize), tagPidSequenceNrs, None) + write(buffer, tagPidSequenceNrs, None) } case Flush => if (buffer.nonEmpty) { - // TODO this should br broken into batches https://github.com/akka/akka-persistence-cassandra/issues/405 log.debug("External flush request from [{}]. Flushing.", sender()) - write(buffer, Vector.empty[(Serialized, TagPidSequenceNr)], tagPidSequenceNrs, Some(sender())) + write(buffer, tagPidSequenceNrs, Some(sender())) } else { log.debug("External flush request from [{}], buffer empty.", sender()) sender() ! FlushComplete } case TagWrite(_, payload, _) => - // TODO keeping this sorted is over kill. We only need to know if a new timebucket is - // reached to force a flush or that the batch size is met - val (newTagPidSequenceNrs, events) = + val (newTagPidSequenceNrs, events: Seq[(Serialized, TagPidSequenceNr)]) = { assignTagPidSequenceNumbers(payload.toVector, tagPidSequenceNrs) - val newBuffer = (buffer ++ events).sortBy(_._1.timeUuid)(timeUuidOrdering) + } + val newWrite = AwaitingWrite(events, OptionVal(sender())) + val newBuffer = buffer.add(newWrite) flushIfRequired(newBuffer, newTagPidSequenceNrs) case twd: TagWriteDone => log.error("Received Done when in idle state. This is a bug. Please report with DEBUG logs: {}", twd) case ResetPersistenceId(_, tp @ TagProgress(pid, _, tagPidSequenceNr)) => log.debug("Resetting pid {}. TagProgress {}", pid, tp) - become(idle(buffer.filterNot(_._1.persistenceId == pid), tagPidSequenceNrs + (pid -> tagPidSequenceNr))) + become(idle(buffer.remove(pid), tagPidSequenceNrs + (pid -> tagPidSequenceNr))) sender() ! ResetPersistenceIdComplete case ReceiveTimeout => @@ -182,12 +185,12 @@ import akka.util.UUIDComparator } private def writeInProgress( - buffer: Vector[(Serialized, TagPidSequenceNr)], + buffer: Buffer, tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr], awaitingFlush: Option[ActorRef]): Receive = { case DropState(pid) => log.debug("Dropping state for pid: [{}]", pid) - become(writeInProgress(buffer, tagPidSequenceNrs - pid, awaitingFlush)) + become(writeInProgress(buffer.remove(pid), tagPidSequenceNrs - pid, awaitingFlush)) case InternalFlush => // Ignore, we will check when the write is done case Flush => @@ -196,6 +199,7 @@ import akka.util.UUIDComparator case TagWrite(_, payload, _) => val (updatedTagPidSequenceNrs, events) = assignTagPidSequenceNumbers(payload.toVector, tagPidSequenceNrs) + val awaitingWrite = AwaitingWrite(events, OptionVal(sender())) val now = System.nanoTime() if (buffer.size > (4 * settings.maxBatchSize) && now > (lastLoggedBufferNs + bufferWarningMinDurationNs)) { lastLoggedBufferNs = now @@ -203,14 +207,21 @@ import akka.util.UUIDComparator "Buffer for tagged events is getting too large ({}), is Cassandra responsive? Are writes failing? " + "If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: {}", buffer.size, - formatOffset(buffer.head._1.timeUuid)) + formatOffset(buffer.nextBatch.head.events.head._1.timeUuid)) } // buffer until current query is finished // Don't sort until the write has finished - become(writeInProgress(buffer ++ events, updatedTagPidSequenceNrs, awaitingFlush)) + val newBuffer = buffer.addPending(awaitingWrite) + become(writeInProgress(newBuffer, updatedTagPidSequenceNrs, awaitingFlush)) case TagWriteDone(summary, doneNotify) => - val sortedBuffer = buffer.sortBy(_._1.timeUuid)(timeUuidOrdering) log.debug("Tag write done: {}", summary) + val nextBuffer = buffer.writeComplete() + buffer.nextBatch.foreach { write => + write.ack match { + case OptionVal.None => + case OptionVal.Some(ref) => ref ! Done + } + } summary.foreach { case (id, PidProgress(_, seqNrTo, tagPidSequenceNr, offset)) => // These writes do not block future writes. We don't read the tag progress again from C* @@ -233,15 +244,14 @@ import akka.util.UUIDComparator awaitingFlush match { case Some(replyTo) => log.debug("External flush request") - if (sortedBuffer.nonEmpty) { - // TODO break into batches - write(sortedBuffer, Vector.empty[(Serialized, TagPidSequenceNr)], tagPidSequenceNrs, awaitingFlush) + if (buffer.pending.nonEmpty) { + write(nextBuffer, tagPidSequenceNrs, awaitingFlush) } else { replyTo ! FlushComplete - context.become(idle(sortedBuffer, tagPidSequenceNrs)) + context.become(idle(nextBuffer, tagPidSequenceNrs)) } case None => - flushIfRequired(sortedBuffer, tagPidSequenceNrs) + flushIfRequired(nextBuffer, tagPidSequenceNrs) } sendPubsubNotification() doneNotify.foreach(_ ! FlushComplete) @@ -253,15 +263,11 @@ import akka.util.UUIDComparator t) timers.startSingleTimer(FlushKey, InternalFlush, settings.flushInterval) parent ! TagWriters.TagWriteFailed(t) - context.become(idle(events ++ buffer, tagPidSequenceNrs)) + context.become(idle(buffer, tagPidSequenceNrs)) case ResetPersistenceId(_, tp @ TagProgress(pid, _, _)) => log.debug("Resetting persistence id {}. TagProgress {}", pid, tp) - become( - writeInProgress( - buffer.filterNot(_._1.persistenceId == pid), - tagPidSequenceNrs + (pid -> tp.pidTagSequenceNr), - awaitingFlush)) + become(writeInProgress(buffer.remove(pid), tagPidSequenceNrs + (pid -> tp.pidTagSequenceNr), awaitingFlush)) sender() ! ResetPersistenceIdComplete case ReceiveTimeout => @@ -278,36 +284,17 @@ import akka.util.UUIDComparator } } - private def flushIfRequired(buffer: Vector[(Serialized, TagPidSequenceNr)], tagSequenceNrs: Map[String, Long]): Unit = + private def flushIfRequired(buffer: Buffer, tagSequenceNrs: Map[String, Long]): Unit = { if (buffer.isEmpty) { context.become(idle(buffer, tagSequenceNrs)) - } else if (buffer.head._1.timeBucket < buffer.last._1.timeBucket) { - val (currentBucket, rest) = - buffer.span(_._1.timeBucket == buffer.head._1.timeBucket) - if (log.isDebugEnabled) { - log.debug( - "Switching time buckets: head: {} last: {}. Number in current bucket: {}", - buffer.head._1.timeBucket, - buffer.last._1.timeBucket, - currentBucket.size) - } - - if (currentBucket.size > settings.maxBatchSize) { - write(buffer.take(settings.maxBatchSize), buffer.drop(settings.maxBatchSize), tagSequenceNrs, None) - } else { - write(currentBucket, rest, tagSequenceNrs, None) - } - } else if (buffer.size >= settings.maxBatchSize) { - log.debug("Batch size reached. Writing to Cassandra.") - write(buffer.take(settings.maxBatchSize), buffer.drop(settings.maxBatchSize), tagSequenceNrs, None) - } else if (settings.flushInterval == Duration.Zero) { - // Should always be a buffer of 1 - write(buffer, Vector.empty[(Serialized, TagPidSequenceNr)], tagSequenceNrs, None) + } else if (buffer.shouldWrite() || settings.flushInterval == Duration.Zero) { + write(buffer, tagSequenceNrs, None) } else { if (!timers.isTimerActive(FlushKey)) timers.startSingleTimer(FlushKey, InternalFlush, settings.flushInterval) context.become(idle(buffer, tagSequenceNrs)) } + } /** * Defaults to 1 as if recovery for a persistent Actor based its recovery @@ -329,42 +316,46 @@ import akka.util.UUIDComparator * Events should be ordered by sequence nr per pid */ private def write( - events: Vector[(Serialized, TagPidSequenceNr)], - remainingBuffer: Vector[(Serialized, TagPidSequenceNr)], + buffer: Buffer, tagPidSequenceNrs: Map[String, TagPidSequenceNr], notifyWhenDone: Option[ActorRef]): Unit = { - val writeSummary = createTagWriteSummary(events) - log.debug("Starting tag write of {} events. Summary: {}", events.size, writeSummary) - val withFailure = session.writeBatch(tag, events).map(_ => TagWriteDone(writeSummary, notifyWhenDone)).recover { + val writeSummary = createTagWriteSummary(buffer) + log.debug("Starting tag write of {} events. Summary: {}", buffer.nextBatch.size, writeSummary) + val withFailure = session.writeBatch(tag, buffer).map(_ => TagWriteDone(writeSummary, notifyWhenDone)).recover { case NonFatal(t) => - TagWriteFailed(t, events) + TagWriteFailed(t, buffer.nextBatch) } import context.dispatcher withFailure.pipeTo(self) // notifyWhenDone is cleared out as it is now in the TagWriteDone - context.become(writeInProgress(remainingBuffer, tagPidSequenceNrs, None)) + context.become(writeInProgress(buffer, tagPidSequenceNrs, None)) } - private def createTagWriteSummary(writes: Seq[(Serialized, TagPidSequenceNr)]): Map[PersistenceId, PidProgress] = - writes.foldLeft(Map.empty[PersistenceId, PidProgress])((acc, next) => { - val (event, tagPidSequenceNr) = next - acc.get(event.persistenceId) match { - case Some(PidProgress(from, to, _, _)) => - if (event.sequenceNr <= to) - throw new IllegalStateException( - s"Expected events to be ordered by seqNr. ${event.persistenceId} " + - s"Events: ${writes.map(e => (e._1.persistenceId, e._1.sequenceNr, e._1.timeUuid))}") - acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, tagPidSequenceNr, event.timeUuid)) - case None => - acc + (event.persistenceId -> PidProgress( - event.sequenceNr, - event.sequenceNr, - tagPidSequenceNr, - event.timeUuid)) - } - }) + private def createTagWriteSummary(writes: Buffer): Map[PersistenceId, PidProgress] = { + writes.nextBatch + .flatten(_.events) + .foldLeft(Map.empty[PersistenceId, PidProgress])((acc, next) => { + val (event, tagPidSequenceNr) = next + acc.get(event.persistenceId) match { + case Some(PidProgress(from, to, _, _)) => + if (event.sequenceNr <= to) + throw new IllegalStateException( + s"Expected events to be ordered by seqNr. ${event.persistenceId} " + + s"Events: ${writes.nextBatch.map(e => + (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}") + + acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, tagPidSequenceNr, event.timeUuid)) + case None => + acc + (event.persistenceId -> PidProgress( + event.sequenceNr, + event.sequenceNr, + tagPidSequenceNr, + event.timeUuid)) + } + }) + } private def assignTagPidSequenceNumbers( events: Vector[Serialized], diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala index 877a82376..e13197113 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala @@ -23,6 +23,7 @@ import akka.actor.Props import akka.actor.SupervisorStrategy import akka.actor.Timers import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.persistence.cassandra.journal.CassandraJournal._ import akka.persistence.cassandra.journal.TagWriter._ @@ -55,7 +56,7 @@ import scala.util.Try session.executeWrite(stmt.setExecutionProfileName(writeProfile)) } - def writeBatch(tag: Tag, events: Seq[(Serialized, Long)])(implicit ec: ExecutionContext): Future[Done] = { + def writeBatch(tag: Tag, events: Buffer)(implicit ec: ExecutionContext): Future[Done] = { val batch = new BatchStatementBuilder(BatchType.UNLOGGED) batch.setExecutionProfileName(writeProfile) val tagWritePSs = for { @@ -66,34 +67,36 @@ import scala.util.Try tagWritePSs .map { case (withMeta, withoutMeta) => - events.foreach { - case (event, pidTagSequenceNr) => - val ps = if (event.meta.isDefined) withMeta else withoutMeta - val bound = ps.bind( - tag, - event.timeBucket.key: JLong, - event.timeUuid, - pidTagSequenceNr: JLong, - event.serialized, - event.eventAdapterManifest, - event.persistenceId, - event.sequenceNr: JLong, - event.serId: JInt, - event.serManifest, - event.writerUuid) - - val finished = event.meta match { - case Some(m) => - bound - .setByteBuffer("meta", m.serialized) - .setString("meta_ser_manifest", m.serManifest) - .setInt("meta_ser_id", m.serId) - case None => - bound - } - - // this is a mutable builder - batch.addStatement(finished) + events.nextBatch.foreach { awaitingWrite => + awaitingWrite.events.foreach { + case (event, pidTagSequenceNr) => + val ps = if (event.meta.isDefined) withMeta else withoutMeta + val bound = ps.bind( + tag, + event.timeBucket.key: JLong, + event.timeUuid, + pidTagSequenceNr: JLong, + event.serialized, + event.eventAdapterManifest, + event.persistenceId, + event.sequenceNr: JLong, + event.serId: JInt, + event.serManifest, + event.writerUuid) + + val finished = event.meta match { + case Some(m) => + bound + .setByteBuffer("meta", m.serialized) + .setString("meta_ser_manifest", m.serManifest) + .setInt("meta_ser_id", m.serId) + case None => + bound + } + + // this is a mutable builder + batch.addStatement(finished) + } } batch.build() } @@ -125,6 +128,7 @@ import scala.util.Try /** * All serialised should be for the same persistenceId + * * @param actorRunning migration sends these messages without the actor running so TagWriters should not * validate that the pid is running */ @@ -134,28 +138,33 @@ import scala.util.Try def props(settings: TagWriterSettings, tagWriterSession: TagWritersSession): Props = Props(new TagWriters(settings, tagWriterSession)) - final case class TagFlush(tag: String) final case class FlushAllTagWriters(timeout: Timeout) + case object AllFlushed final case class SetTagProgress(pid: String, tagProgresses: Map[Tag, TagProgress]) + case object TagProcessAck final case class PersistentActorStarting(pid: String, persistentActor: ActorRef) + case object PersistentActorStartingAck final case class TagWriteFailed(reason: Throwable) + private case object WriteTagScanningTick + private case class WriteTagScanningCompleted(result: Try[Done], startTime: Long, size: Int) private case class PersistentActorTerminated(pid: PersistenceId, ref: ActorRef) + private case class TagWriterTerminated(tag: String) /** - * @param message the message to send - * @param tellOrAsk Left for the `sender` of tell, `right` for the promise of ask. + * @param message the message to send */ - private case class PassivateBufferEntry(message: Any, tellOrAsk: Either[ActorRef, Promise[Any]]) + private case class PassivateBufferEntry(message: Any, response: Promise[Any]) + } /** @@ -206,12 +215,13 @@ import scala.util.Try }) } Future.sequence(flushes).map(_ => AllFlushed).pipeTo(replyTo) - case TagFlush(tag) => - tellTagActor(tag, Flush, sender()) case tw: TagWrite => - forwardTagWrite(tw) + // this only comes from the replay, an ack is not required right now. + forwardTagWrite(tw).pipeTo(sender()) case BulkTagWrite(tws, withoutTags) => - tws.foreach(forwardTagWrite) + val replyTo = sender() + val forwards = tws.map(forwardTagWrite) + Future.sequence(forwards).map(_ => Done)(ExecutionContexts.parasitic).pipeTo(replyTo) updatePendingScanning(withoutTags) case WriteTagScanningTick => writeTagScanning() @@ -329,14 +339,15 @@ import scala.util.Try } - private def forwardTagWrite(tw: TagWrite): Unit = { + private def forwardTagWrite(tw: TagWrite): Future[Done] = { if (tw.actorRunning && !currentPersistentActors.contains(tw.serialised.head.persistenceId)) { log.warning( "received TagWrite but actor not active (dropping, will be resolved when actor restarts): [{}]", tw.serialised.head.persistenceId) + Future.successful(Done) } else { updatePendingScanning(tw.serialised) - tellTagActor(tw.tag, tw, sender()) + askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContexts.parasitic) } } @@ -423,20 +434,11 @@ import scala.util.Try TagWriterTerminated(tag)) } - private def tellTagActor(tag: String, message: Any, snd: ActorRef): Unit = { - passivatingTagActors.get(tag) match { - case Some(buffer) => - passivatingTagActors = passivatingTagActors.updated(tag, buffer :+ PassivateBufferEntry(message, Left(snd))) - case None => - tagActor(tag).tell(message, snd) - } - } - private def askTagActor(tag: String, message: Any)(implicit timeout: Timeout): Future[Any] = { passivatingTagActors.get(tag) match { case Some(buffer) => val p = Promise[Any]() - passivatingTagActors = passivatingTagActors.updated(tag, buffer :+ PassivateBufferEntry(message, Right(p))) + passivatingTagActors = passivatingTagActors.updated(tag, buffer :+ PassivateBufferEntry(message, p)) p.future case None => tagActor(tag).ask(message) @@ -474,9 +476,8 @@ import scala.util.Try } private def sendPassivateBuffer(tag: String, buffer: Vector[PassivateBufferEntry]): Unit = { - buffer.foreach { - case PassivateBufferEntry(message, Left(snd)) => tellTagActor(tag, message, snd) - case PassivateBufferEntry(message, Right(promise)) => promise.completeWith(askTagActor(tag, message)) + buffer.foreach { entry => + entry.response.completeWith(askTagActor(tag, entry.message)) } } } diff --git a/core/src/test/scala/akka/persistence/cassandra/journal/BufferSpec.scala b/core/src/test/scala/akka/persistence/cassandra/journal/BufferSpec.scala new file mode 100644 index 000000000..250d9390f --- /dev/null +++ b/core/src/test/scala/akka/persistence/cassandra/journal/BufferSpec.scala @@ -0,0 +1,203 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.persistence.cassandra.journal + +import java.nio.ByteBuffer +import java.util.UUID + +import akka.actor.{ ActorRef, ActorSystem } +import akka.persistence.cassandra.Day +import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, TagPidSequenceNr } +import akka.persistence.cassandra.journal.TagWriter.AwaitingWrite +import akka.testkit.{ TestKit, TestProbe } +import akka.util.OptionVal +import com.datastax.oss.driver.api.core.uuid.Uuids +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class BufferSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { + implicit val system = ActorSystem() + + "Buffer" should { + "not write when empty" in { + Buffer.empty(2).shouldWrite() shouldEqual false + } + "write when batch size met (single AwaitingWrite)" in { + val bucket = nowBucket() + val e1 = event("p1", 1L, "e-1", bucket) + val e2 = event("p1", 2L, "e-2", bucket) + + Buffer.empty(2).add(AwaitingWrite(List((e1, 1), (e2, 2)), OptionVal.None)).shouldWrite() shouldEqual true + } + "write when batch size met (multiple AwaitingWrites)" in { + val bucket = nowBucket() + val e1 = event("p1", 1L, "e-1", bucket) + val e2 = event("p1", 2L, "e-2", bucket) + + val buffer = Buffer + .empty(2) + .add(AwaitingWrite(List((e1, 1)), OptionVal.None)) + .add(AwaitingWrite(List((e2, 12)), OptionVal.None)) + + buffer.shouldWrite() shouldEqual true + buffer.nextBatch shouldEqual (List( + AwaitingWrite(List((e1, 1)), OptionVal.None), + AwaitingWrite(List((e2, 12)), OptionVal.None))) + } + "write when time bucket changes" in { + val bucket = nowBucket() + val e1 = event("p1", 1L, "e-1", bucket) + val e2 = event("p1", 2L, "e-2", bucket.next()) + + Buffer + .empty(10) + .add(AwaitingWrite(List((e1, 1)), OptionVal.None)) + .add(AwaitingWrite(List((e2, 12)), OptionVal.None)) + .shouldWrite() shouldEqual true + } + + "write when time bucket changes in the same add" in { + val bucket = nowBucket() + val e1 = event("p1", 1L, "e-1", bucket) + val e2 = event("p1", 2L, "e-2", bucket.next()) + val sender = TestProbe().ref + + val buffer = Buffer.empty(3).add(aw(sender, (e1, 1), (e2, 2))) // same aw, spanning time buckets + + buffer.writeRequired shouldEqual true + buffer.nextBatch shouldEqual Vector(awNoSender((e1, 1))) + + val nextBuffer = buffer.writeComplete() + nextBuffer.writeRequired shouldEqual false + nextBuffer.nextBatch shouldEqual Vector(aw(sender, (e2, 2))) + + } + "write when multiple time buckets" in { + val bucket = nowBucket() + val e1 = event("p1", 1L, "e-1", bucket) + val e2 = event("p1", 2L, "e-2", bucket.next()) + val e3 = event("p1", 3L, "e-2", bucket.next().next()) + val sender = TestProbe().ref + + val buffer = Buffer.empty(3).add(aw(sender, (e1, 1), (e2, 2), (e3, 3))) // same aw, spanning time buckets + + buffer.writeRequired shouldEqual true + buffer.nextBatch shouldEqual Vector(awNoSender((e1, 1))) + buffer.pending shouldEqual Vector(awNoSender((e2, 2)), aw(sender, (e3, 3))) + + val nextBuffer = buffer.writeComplete() + nextBuffer.writeRequired shouldEqual true + nextBuffer.nextBatch shouldEqual Vector(awNoSender((e2, 2))) + nextBuffer.pending shouldEqual Vector(aw(sender, (e3, 3))) + + val nextNextBuffer = nextBuffer.writeComplete() + nextNextBuffer.writeRequired shouldEqual false + nextNextBuffer.nextBatch shouldEqual Vector(aw(sender, (e3, 3))) + nextNextBuffer.pending shouldEqual Vector() + } + + "break up writes based on batch" in { + val bucket = nowBucket() + val e1 = event("p1", 1L, "e-1", bucket) + val e2 = event("p1", 2L, "e-2", bucket) + val e3 = event("p1", 3L, "e-3", bucket) + val e4 = event("p1", 4L, "e-2", bucket) + val sender = TestProbe().ref + + val buffer = Buffer.empty(2).add(aw(sender, (e1, 1), (e2, 2), (e3, 3), (e4, 4))) // same aw, greater than batch size + + buffer.writeRequired shouldEqual true + buffer.nextBatch shouldEqual Vector(awNoSender((e1, 1), (e2, 2))) + + val nextBuffer = buffer.writeComplete() + nextBuffer.nextBatch shouldEqual Vector(aw(sender, (e3, 3), (e4, 4))) + nextBuffer.writeRequired shouldEqual true + + val nextNextBuffer = nextBuffer.writeComplete() + nextNextBuffer.writeRequired shouldEqual false + } + + // there will never be events in the same add that are out of order as they are from the same pid + // but there can be from different adds + "handle out of order writes" in { + val currentBucket = (0 to 2).map { _ => + val uuid = Uuids.timeBased() + (uuid, TimeBucket(uuid, Day)) + } + val sender1 = TestProbe().ref + val sender2 = TestProbe().ref + val sender3 = TestProbe().ref + + val futureBucketMillis = Uuids.unixTimestamp(currentBucket(0)._1) + Day.durationMillis + val futureBucket = TimeBucket(futureBucketMillis, Day) + + val p1e1 = event("p1", 1, "p1-e1", currentBucket(0)._2, uuid = currentBucket(0)._1) + val p2e1 = event("p2", 1, "p2-e1", futureBucket, uuid = Uuids.startOf(futureBucketMillis)) + val p1e2 = event("p1", 2, "p1-e2", currentBucket(1)._2, uuid = currentBucket(1)._1) + + val buffer = Buffer.empty(2).add(aw(sender1, (p1e1, 1))).add(aw(sender2, (p2e1, 1))) + + buffer.shouldWrite() shouldEqual true + buffer.nextBatch shouldEqual Vector(aw(sender1, (p1e1, 1))) + + val nextBuffer = buffer.writeComplete() + nextBuffer.shouldWrite() shouldEqual false + nextBuffer.nextBatch shouldEqual Vector(aw(sender2, (p2e1, 1))) + + // this is from before what is currently in nextWrite, they should be switched around + val nextNextBuffer = nextBuffer.add(aw(sender3, (p1e2, 2))) + nextNextBuffer.shouldWrite() shouldEqual true + nextNextBuffer.nextBatch shouldEqual Vector(aw(sender3, (p1e2, 2))) + + val finalBuffer = nextNextBuffer.writeComplete() + finalBuffer.shouldWrite() shouldEqual false + nextBuffer.nextBatch shouldEqual Vector(aw(sender2, (p2e1, 1))) + + } + + "handle being overloaded" in { + val bucket = nowBucket() + var buffer = Buffer.empty(2) + val totalWrites = 1000000 + for (i <- 1 to totalWrites) { + buffer = buffer.add(awNoSender((event("p1", seqNr = i, payload = "cats", bucket), i))) + } + var writes = 0 + while (buffer.shouldWrite()) { + writes += 1 + buffer = buffer.writeComplete() + } + writes shouldEqual totalWrites / 2 + } + } + + override protected def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + private def awNoSender(events: (Serialized, TagPidSequenceNr)*): AwaitingWrite = { + AwaitingWrite(events.toList, OptionVal.None) + } + + private def aw(sender: ActorRef, events: (Serialized, TagPidSequenceNr)*): AwaitingWrite = { + AwaitingWrite(events.toList, OptionVal(sender)) + } + + private def event( + pId: String, + seqNr: Long, + payload: String, + bucket: TimeBucket, + tags: Set[String] = Set(), + uuid: UUID = Uuids.timeBased()): Serialized = + Serialized(pId, seqNr, ByteBuffer.wrap(payload.getBytes()), tags, "", "", 1, "", None, uuid, bucket) + + private def nowBucket(): TimeBucket = { + val now = Uuids.timeBased() + TimeBucket(now, Day) + } + +} diff --git a/core/src/test/scala/akka/persistence/cassandra/journal/TagWriterSpec.scala b/core/src/test/scala/akka/persistence/cassandra/journal/TagWriterSpec.scala index 11ad4ed31..f077e827c 100644 --- a/core/src/test/scala/akka/persistence/cassandra/journal/TagWriterSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/journal/TagWriterSpec.scala @@ -17,31 +17,34 @@ import akka.persistence.cassandra.journal.TagWriters.TagWrite import akka.persistence.cassandra.journal.TagWriterSpec.{ EventWrite, ProgressWrite, TestEx } import akka.persistence.cassandra.formatOffset import akka.persistence.cassandra.journal.TagWriters.TagWritersSession -import akka.testkit.{ ImplicitSender, TestKit, TestProbe } +import akka.testkit.{ TestKit, TestProbe } import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Statement } import com.datastax.oss.driver.api.core.uuid.Uuids import com.github.ghik.silencer.silent -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.wordspec.AnyWordSpecLike + import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.util.control.NoStackTrace object TagWriterSpec { - val config = + val config: Config = ConfigFactory.parseString(""" - |akka { - | actor { - | debug { - | # enable function of LoggingReceive, which is to log any received message at - | # DEBUG level - | receive = on - | unhandled = on - | } - | } - |} - """.stripMargin) + akka { + use-slf4j = off + loglevel = DEBUG + actor { + debug { + # enable function of LoggingReceive, which is to log any received message at + # DEBUG level + receive = on + unhandled = on + } + } + } + """) case class ProgressWrite(persistenceId: String, seqNr: Long, tagPidSequenceNr: Long, offset: UUID) case class EventWrite(persistenceId: String, seqNr: Long, tagPidSequenceNr: Long) @@ -58,7 +61,6 @@ class TagWriterSpec extends TestKit(ActorSystem("TagWriterSpec", TagWriterSpec.config)) with AnyWordSpecLike with BeforeAndAfterEach - with ImplicitSender with BeforeAndAfterAll { override protected def afterAll(): Unit = @@ -87,22 +89,31 @@ class TagWriterSpec super.afterEach() } + class Setup { + val sender = TestProbe() + implicit val senderRef: ActorRef = sender.ref + } + "Tag writer batching" must { - "external flush when idle" in { + "external flush when idle" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 100, flushInterval = 1.hour)) val bucket = nowBucket() val e1 = event("p1", 1L, "e-1", bucket) - ref ! TagWrite(tagName, Vector(e1)) + ref.tell(TagWrite(tagName, Vector(e1)), sender.ref) probe.expectNoMessage(waitDuration) ref ! Flush probe.expectMsg(Vector(toEw(e1, 1))) probe.expectMsg(ProgressWrite("p1", 1, 1, e1.timeUuid)) - expectMsg(FlushComplete) + sender.expectMsg(Done) + sender.expectMsg(FlushComplete) } - "external flush when write in progress and messages in buffer" in { + "external flush when write in progress and messages in buffer" in new Setup { val promiseForWrite = Promise[Done]() + val sender1 = TestProbe() + val sender2 = TestProbe() + val sender3 = TestProbe() val (probe, ref) = setup( writeResponse = Stream(promiseForWrite.future) ++ Stream.continually(Future.successful(Done)), @@ -113,24 +124,28 @@ class TagWriterSpec val e3 = event("p1", 3L, "e-3", bucket) val e4 = event("p1", 4L, "e-4", bucket) - ref ! TagWrite(tagName, Vector(e1, e2)) + ref.tell(TagWrite(tagName, Vector(e1, e2)), sender1.ref) probe.expectMsg(Vector(toEw(e1, 1), toEw(e2, 2))) probe.expectNoMessage(waitDuration) val flushSender = TestProbe("flushSender") - ref ! TagWrite(tagName, Vector(e3)) + ref.tell(TagWrite(tagName, Vector(e3)), sender2.ref) ref.tell(Flush, flushSender.ref) - ref ! TagWrite(tagName, Vector(e4)) // check adding to the buffer while in progress doesn't lose the flush + ref.tell(TagWrite(tagName, Vector(e4)), sender3.ref) // check adding to the buffer while in progress doesn't lose the flush probe.expectNoMessage(waitDuration) + sender2.expectNoMessage(waitDuration) promiseForWrite.success(Done) probe.expectMsg(ProgressWrite("p1", 2, 2, e2.timeUuid)) + sender1.expectMsg(Done) // only happened due to the flush probe.expectMsg(Vector(toEw(e3, 3), toEw(e4, 4))) probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) flushSender.expectMsg(FlushComplete) + sender2.expectMsg(Done) + sender3.expectMsg(Done) } - "external flush when write in progress and receiving a ResetPersistenceId" in { + "external flush when write in progress and receiving a ResetPersistenceId" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( @@ -148,7 +163,7 @@ class TagWriterSpec ref ! TagWrite(tagName, Vector(e2)) ref.tell(Flush, flushSender.ref) ref ! ResetPersistenceId("tag-1", TagProgress("some-other-pid", 2, 4)) // check this preserves flush request - expectMsg(ResetPersistenceIdComplete) + sender.expectMsg(ResetPersistenceIdComplete) promiseForWrite.success(Done) probe.expectMsg(ProgressWrite("p1", 1, 1, e1.timeUuid)) probe.expectMsg(Vector(toEw(e2, 2))) @@ -156,7 +171,7 @@ class TagWriterSpec flushSender.expectMsg(FlushComplete) } - "flush on demand when query in progress and no messages in buffer" in { + "flush on demand when query in progress and no messages in buffer" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( @@ -178,7 +193,7 @@ class TagWriterSpec flushSender.expectMsg(FlushComplete) } - "not write until batch has reached capacity" in { + "not write until batch has reached capacity" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 2)) val bucket = nowBucket() val e1 = event("p1", 1L, "e-1", bucket) @@ -195,7 +210,7 @@ class TagWriterSpec probe.expectNoMessage(waitDuration) } - "write multiple persistenceIds in the same batch" in { + "write multiple persistenceIds in the same batch" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 3)) val bucket = nowBucket() val p1e1 = event("p1", 1L, "e-1", bucket) @@ -209,7 +224,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p2", 1, 1, p2e1.timeUuid)) } - "flush after interval" in { + "flush after interval" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 2, flushInterval = 500.millis)) val bucket = nowBucket() @@ -226,7 +241,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 2, 2, e2.timeUuid)) } - "flush after interval when new events are written" in { + "flush after interval when new events are written" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 100, flushInterval = 500.millis)) val bucket = nowBucket() @@ -239,7 +254,7 @@ class TagWriterSpec Thread.sleep(200) if (n == 3) { probe.within(200.millis) { - probe.expectMsg(events.map(evt => toEw(evt, evt.sequenceNr)).toVector) + probe.expectMsg(events.map(evt => toEw(evt, evt.sequenceNr))) probe.expectMsg( ProgressWrite("p1", events.last.sequenceNr, events.last.sequenceNr, events.last.timeUuid)) } @@ -248,7 +263,7 @@ class TagWriterSpec } val remainingFlushedEvents = allEvents.drop(3) - probe.expectMsg(remainingFlushedEvents.map(evt => toEw(evt, evt.sequenceNr)).toVector) + probe.expectMsg(remainingFlushedEvents.map(evt => toEw(evt, evt.sequenceNr))) probe.expectMsg( ProgressWrite( "p1", @@ -258,7 +273,7 @@ class TagWriterSpec } - "flush when time bucket changes" in { + "flush when time bucket changes" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 3)) val bucket = nowBucket() val nextBucket = bucket.next() @@ -281,27 +296,34 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) } - "flush if time bucket changes within a single msg" in { + "flush if time bucket changes within a single msg" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 3)) + val sender1 = TestProbe() + val sender2 = TestProbe() val bucket = nowBucket() val nextBucket = bucket.next() val e1 = event("p1", 1L, "e-1", bucket) val e2 = event("p1", 2L, "e-2", nextBucket) - ref ! TagWrite(tagName, Vector(e1, e2)) + ref.tell(TagWrite(tagName, Vector(e1, e2)), sender1.ref) probe.expectMsg(Vector(toEw(e1, 1))) probe.expectMsg(ProgressWrite("p1", 1, 1, e1.timeUuid)) probe.expectNoMessage(waitDuration) + // e2 hasn't been written + sender1.expectNoMessage(waitDuration) val e3 = event("p1", 3L, "e-3", nextBucket) val e4 = event("p1", 4L, "e-4", nextBucket) - ref ! TagWrite(tagName, Vector(e3, e4)) + ref.tell(TagWrite(tagName, Vector(e3, e4)), sender2.ref) // batch size has now been hit probe.expectMsg(Vector(toEw(e2, 2), toEw(e3, 3), toEw(e4, 4))) probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) + // sender1 should only respond now as e2 wasn't written until now + sender1.expectMsg(Done) + sender2.expectMsg(Done) } - "not execute query N+1 while query N is outstanding" in { + "not execute query N+1 while query N is outstanding" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( @@ -323,7 +345,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) } - "internal flush time buckets one by one if arrive in same msg" in { + "internal flush time buckets one by one if arrive in same msg" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( @@ -354,7 +376,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) } - "do not internal flush immediately if interval set to 0" in { + "do not internal flush immediately if interval set to 0" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(flushInterval = Duration.Zero)) val bucket = nowBucket() @@ -368,7 +390,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 2, 2, e2.timeUuid)) } - "do not internal flush if write in progress" in { + "do not internal flush if write in progress" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( @@ -391,7 +413,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) } - "do not internal flush if write in progress with no interval" in { + "do not internal flush if write in progress with no interval" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( writeResponse = Stream(promiseForWrite.future) ++ Stream.continually(Future.successful(Done)), @@ -413,7 +435,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) } - "not flush if internal flush is in progress" in { + "not flush if internal flush is in progress" in new Setup { val promiseForWrite = Promise[Done]() val (probe, ref) = setup( writeResponse = Stream(promiseForWrite.future) ++ Stream.continually(Future.successful(Done)), @@ -438,12 +460,12 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 3, 3, e3.timeUuid)) } - "resume from existing sequence nr" in { + "resume from existing sequence nr" in new Setup { val progress = TagProgress("p1", 100, 10) val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 1)) val bucket = nowBucket() ref ! ResetPersistenceId(tagName, progress) - expectMsg(ResetPersistenceIdComplete) + sender.expectMsg(ResetPersistenceIdComplete) val e1 = event("p1", 101L, "e-1", bucket) ref ! TagWrite(tagName, Vector(e1)) @@ -457,7 +479,10 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 102, 12, e2.timeUuid)) } - "handle timeuuids coming out of order" in { + // Q. when would this ever happen? Time uuids are always increasing + // A. the send to the tag writer is in a future call back so writes + // from different persistence ids can over take each other + "handle timeuuids coming out of order" in new Setup { val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 4)) val currentBucket = (0 to 2).map { _ => val uuid = Uuids.timeBased() @@ -483,23 +508,24 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 2, 2, p1e2.timeUuid)) } - "update expected sequence nr on reset persistence id request" in { + "update expected sequence nr on reset persistence id request" in new Setup { val pid = "p-1" val initialProgress = TagProgress(pid, 10, 10) val resetProgress = TagProgress(pid, 5, 5) val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 1)) val bucket = nowBucket() - ref ! ResetPersistenceId(tagName, initialProgress) - expectMsg(ResetPersistenceIdComplete) + val resetPidSender = TestProbe("resetPidSender") + ref.tell(ResetPersistenceId(tagName, initialProgress), resetPidSender.ref) + resetPidSender.expectMsg(ResetPersistenceIdComplete) val e11 = event(pid, 11L, "e-11", bucket) ref ! TagWrite(tagName, Vector(e11)) probe.expectMsg(Vector(toEw(e11, 11))) probe.expectMsg(ProgressWrite(pid, 11, 11, e11.timeUuid)) - ref ! ResetPersistenceId(tagName, resetProgress) - expectMsg(ResetPersistenceIdComplete) + ref.tell(ResetPersistenceId(tagName, resetProgress), resetPidSender.ref) + resetPidSender.expectMsg(ResetPersistenceIdComplete) // simulating a restart and recovery starting earlier val e6 = event(pid, 6L, "e-6", bucket) @@ -508,7 +534,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite(pid, 6, 6, e6.timeUuid)) } - "update expected sequence nr on reset persistence id request (when write in progress)" in { + "update expected sequence nr on reset persistence id request (when write in progress)" in new Setup { val pid = "p-1" val initialProgress = TagProgress(pid, 10, 10) val resetProgress = TagProgress(pid, 5, 5) @@ -520,13 +546,13 @@ class TagWriterSpec val bucket = nowBucket() ref ! ResetPersistenceId(tagName, initialProgress) - expectMsg(ResetPersistenceIdComplete) + sender.expectMsg(ResetPersistenceIdComplete) val e11 = event(pid, 11L, "e-11", bucket) ref ! TagWrite(tagName, Vector(e11)) probe.expectMsg(Vector(toEw(e11, 11))) ref ! ResetPersistenceId(tagName, resetProgress) - expectMsg(ResetPersistenceIdComplete) + sender.expectMsg(ResetPersistenceIdComplete) writeInProgressPromise.success(Done) probe.expectMsg(ProgressWrite(pid, 11, 11, e11.timeUuid)) @@ -536,7 +562,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite(pid, 6, 6, e6.timeUuid)) } - "drop outstanding events for a persistence id when reset" in { + "drop outstanding events for a persistence id when reset" in new Setup { val pid = "p-1" // disable any automatic flushing val (probe, ref) = setup(settings = defaultSettings.copy(maxBatchSize = 100, flushInterval = 60.seconds)) @@ -549,17 +575,18 @@ class TagWriterSpec val resetRequest = ResetPersistenceId(tagName, TagProgress(pid, 1, 1)) ref ! resetRequest - expectMsg(ResetPersistenceIdComplete) + sender.expectMsg(ResetPersistenceIdComplete) // can send 2 and 3 again due to the reset ref ! TagWrite(tagName, Vector(e2, e3)) - ref ! Flush - expectMsg(FlushComplete) + val flushSender = TestProbe("flushSender") + ref.tell(Flush, flushSender.ref) + flushSender.expectMsg(FlushComplete) probe.expectMsg(Vector(toEw(e2, 2), toEw(e3, 3))) } - "forget about a persistence id when idle" in { + "forget about a persistence id when idle" in new Setup { val (probe, underTest) = setup(settings = defaultSettings.copy(maxBatchSize = 1, flushInterval = 60.seconds)) val pid = "p1" val bucket = nowBucket() @@ -579,7 +606,7 @@ class TagWriterSpec probe.expectMsg(Vector(toEw(e2, 1))) } - "forget about a persistence id when write in progress)" in { + "forget about a persistence id when write in progress)" in new Setup { val pid = "p-1" val writeInProgressPromise = Promise[Done]() val (probe, underTest) = @@ -603,20 +630,20 @@ class TagWriterSpec } - "passivate when idle" in { + "passivate when idle" in new Setup { val parent = TestProbe() val idleTimeout = 100.millis - val (probe, ref) = + val (_, ref) = setupWithParent( settings = defaultSettings.copy(maxBatchSize = 2, stopTagWriterWhenIdle = idleTimeout), parent = parent.ref) parent.expectMsg(PassivateTagWriter("tag-1")) ref.tell(StopTagWriter, parent.ref) - watch(ref) - expectTerminated(ref) + sender.watch(ref) + sender.expectTerminated(ref) } - "do not passivate if write in progress" in { + "do not passivate if write in progress" in new Setup { val promiseForWrite = Promise[Done]() val parent = TestProbe() val idleTimeout = 1.second @@ -641,7 +668,7 @@ class TagWriterSpec "Tag writer error scenarios" must { - "handle tag writes view failing" in { + "handle tag writes view failing" in new Setup { val t = TestEx("Tag write failed") val (probe, ref) = setup( settings = defaultSettings.copy(maxBatchSize = 2), @@ -672,7 +699,7 @@ class TagWriterSpec probe.expectMsg(ProgressWrite("p1", 4, 4, e4.timeUuid)) } - "handle tag progress write failing" in { + "handle tag progress write failing" in new Setup { val t = TestEx("Tag progress write has failed") val (probe, ref) = setup( @@ -726,8 +753,8 @@ class TagWriterSpec val session = new TagWritersSession(null, "unused", "unused", null) { - override def writeBatch(tag: Tag, events: Seq[(Serialized, Long)])(implicit ec: ExecutionContext) = { - probe.ref ! events.map { + override def writeBatch(tag: Tag, write: Buffer)(implicit ec: ExecutionContext) = { + probe.ref ! write.nextBatch.flatten(_.events).map { case (event, tagPidSequenceNr) => toEw(event, tagPidSequenceNr) } val (result, tail) = (writeResponseStream.head, writeResponseStream.tail) diff --git a/core/src/test/scala/akka/persistence/cassandra/journal/TagWritersSpec.scala b/core/src/test/scala/akka/persistence/cassandra/journal/TagWritersSpec.scala index 109ac84b1..a77936f07 100644 --- a/core/src/test/scala/akka/persistence/cassandra/journal/TagWritersSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/journal/TagWritersSpec.scala @@ -6,6 +6,7 @@ package akka.persistence.cassandra.journal import java.nio.ByteBuffer +import akka.Done import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props } import akka.persistence.cassandra.Hour import akka.persistence.cassandra.journal.CassandraJournal.Serialized @@ -14,13 +15,26 @@ import akka.persistence.cassandra.journal.TagWriters._ import akka.testkit.{ ImplicitSender, TestKit, TestProbe } import akka.util.Timeout import com.datastax.oss.driver.api.core.uuid.Uuids +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.matchers.should.Matchers + import scala.concurrent.duration._ +object TagWritersSpec { + val config: Config = + ConfigFactory.parseString(""" + akka { + use-slf4j = off + loglevel = INFO + } + """) + +} + class TagWritersSpec - extends TestKit(ActorSystem("TagWriterSpec")) + extends TestKit(ActorSystem("TagWriterSpec", TagWritersSpec.config)) with AnyWordSpecLike with BeforeAndAfterAll with ImplicitSender @@ -53,17 +67,28 @@ class TagWritersSpec } "Tag writers" must { - "forward flush requests" in { - val probe = TestProbe() - val tagWriters = system.actorOf(testProps(defaultSettings, tag => { - tag shouldEqual "blue" - probe.ref - })) - tagWriters ! TagFlush("blue") - probe.expectMsg(Flush) - probe.reply(FlushComplete) - expectMsg(FlushComplete) // should be forwarded + "reply when all tag writes are complete" in { + val redProbe = TestProbe() + val blueProbe = TestProbe() + val probes = Map("red" -> redProbe, "blue" -> blueProbe) + val tagWriters = system.actorOf(testProps(defaultSettings, tag => probes(tag).ref)) + initializePid(tagWriters) + + val blueTagWrite = TagWrite("blue", List(event(pid, 0, "cat", Set("red", "blue")))) + val redTagWrite = TagWrite("red", List(event(pid, 0, "dog", Set("red", "blue")))) + + tagWriters ! BulkTagWrite(List(blueTagWrite, redTagWrite), Nil) + + expectNoMessage() + + redProbe.expectMsg(redTagWrite) + blueProbe.expectMsg(blueTagWrite) + + redProbe.reply(Done) + expectNoMessage() + blueProbe.reply(Done) + expectMsg(Done) } "flush all tag writers" in { @@ -146,8 +171,10 @@ class TagWritersSpec val tagWriters = system.actorOf(testProps(defaultSettings, _ => probe.ref)) initializePid(tagWriters) - tagWriters ! TagFlush("blue") - probe.expectMsg(Flush) + // send a tag write so that it is created + val preWrite = TagWrite("blue", List(dummySerialized("blue"))) + tagWriters ! preWrite + probe.expectMsg(preWrite) tagWriters.tell(PassivateTagWriter("blue"), probe.ref) probe.expectMsg(StopTagWriter) @@ -169,6 +196,11 @@ class TagWritersSpec } } + private def event(pid: String, seqNr: Long, payload: String, tags: Set[String]): Serialized = { + val uuid = Uuids.timeBased() + Serialized(pid, seqNr, ByteBuffer.wrap(payload.getBytes()), tags, "", "", 1, "", None, uuid, TimeBucket(uuid, Hour)) + } + override protected def afterAll(): Unit = shutdown() }