Skip to content

Commit

Permalink
Merge pull request #542 from akka/wip-320-stop-TagWriter-patriknw
Browse files Browse the repository at this point in the history
Stop TagWriter when it's idle, #320
  • Loading branch information
patriknw authored Jan 20, 2020
2 parents 0bfd368 + 4a16e66 commit 9bc0134
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 24 deletions.
3 changes: 3 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ cassandra-plugin {
# Valid options: Day, Hour, Minute
bucket-size = "Hour"

# The actor responsible for writing a tag is stopped if the tag isn't used for this duration.
stop-tag-writer-when-idle = 10s

# How long to look for delayed events
# This works by adding an additional (internal) sequence number to each tag / persistence id
# event stream so that the read side can detect missing events. When a gap is detected no new events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class CassandraJournalConfig(system: ActorSystem, config: Config)
eventsByTagConfig.getInt("max-message-batch-size"),
eventsByTagConfig.getDuration("flush-interval", TimeUnit.MILLISECONDS).millis,
eventsByTagConfig.getDuration("scanning-flush-interval", TimeUnit.MILLISECONDS).millis,
eventsByTagConfig.getDuration("stop-tag-writer-when-idle", TimeUnit.MILLISECONDS).millis,
pubsubNotificationInterval)

val coordinatedShutdownOnError: Boolean = config.getBoolean("coordinated-shutdown-on-error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.journal.TagWriter.TagWriterSettings
import akka.persistence.cassandra.journal.TagWriters.TagWritersSession
import akka.persistence.cassandra.query.UUIDComparator

import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.concurrent.duration._

import akka.actor.ReceiveTimeout

/*
* Groups writes into un-logged batches for the same partition.
*
Expand All @@ -35,13 +36,14 @@ import scala.concurrent.duration._
*/
@InternalApi private[akka] object TagWriter {

private[akka] def props(settings: TagWriterSettings, session: TagWritersSession, tag: Tag): Props =
Props(new TagWriter(settings, session, tag))
private[akka] def props(settings: TagWriterSettings, session: TagWritersSession, tag: Tag, parent: ActorRef): Props =
Props(new TagWriter(settings, session, tag, parent))

private[akka] case class TagWriterSettings(
maxBatchSize: Int,
flushInterval: FiniteDuration,
scanningFlushInterval: FiniteDuration,
stopTagWriterWhenIdle: FiniteDuration,
pubsubNotification: Duration)

private[akka] case class TagProgress(
Expand All @@ -63,6 +65,16 @@ import scala.concurrent.duration._
private[akka] case object Flush
private[akka] case object FlushComplete

// The "passivate pattern" is used to avoid loosing messages between TagWriters (parent)
// and TagWriter when the TagWriter is stopped due to inactivity.
// When idle for longer than configured stopTagWriterWhenIdle the TagWriter sends `PassivateTagWriter` to parent,
// which replies with `StopTagWriter` and starts buffering incoming messages for the tag.
// The TagWriter stops when receiving StopTagWriter if it is still ok to passivate (no state, nothing in progress).
// TagWriters (parent) sends buffered messages if any when the TagWriter has been terminated.
private[akka] final case class PassivateTagWriter(tag: String)
private[akka] final case class CancelPassivateTagWriter(tag: String)
private[akka] case object StopTagWriter

type TagWriteSummary = Map[PersistenceId, PidProgress]
case class PidProgress(seqNrFrom: SequenceNr, seqNrTo: SequenceNr, tagPidSequenceNr: TagPidSequenceNr, offset: UUID)
private case object InternalFlush
Expand All @@ -80,7 +92,11 @@ import scala.concurrent.duration._
}
}

@InternalApi private[akka] class TagWriter(settings: TagWriterSettings, session: TagWritersSession, tag: String)
@InternalApi private[akka] class TagWriter(
settings: TagWriterSettings,
session: TagWritersSession,
tag: String,
parent: ActorRef)
extends Actor
with Timers
with ActorLogging
Expand All @@ -105,13 +121,14 @@ import scala.concurrent.duration._
}
}

private val parent = context.parent

var lastLoggedBufferNs: Long = -1
val bufferWarningMinDurationNs: Long = 5.seconds.toNanos

override def preStart(): Unit =
override def preStart(): Unit = {
log.debug("Running TagWriter for [{}] with settings {}", tag, settings)
if (settings.stopTagWriterWhenIdle > Duration.Zero)
context.setReceiveTimeout(settings.stopTagWriterWhenIdle)
}

override def receive: Receive =
idle(Vector.empty[(Serialized, TagPidSequenceNr)], Map.empty[String, Long])
Expand Down Expand Up @@ -150,6 +167,16 @@ import scala.concurrent.duration._
log.debug("Resetting pid {}. TagProgress {}", pid, tp)
become(idle(buffer.filterNot(_._1.persistenceId == pid), tagPidSequenceNrs + (pid -> tagPidSequenceNr)))
sender() ! ResetPersistenceIdComplete

case ReceiveTimeout =>
if (buffer.isEmpty && tagPidSequenceNrs.isEmpty)
parent ! PassivateTagWriter(tag)

case StopTagWriter =>
if (buffer.isEmpty && tagPidSequenceNrs.isEmpty)
context.stop(self)
else
parent ! CancelPassivateTagWriter(tag)
}

private def writeInProgress(
Expand Down Expand Up @@ -234,6 +261,13 @@ import scala.concurrent.duration._
tagPidSequenceNrs + (pid -> tp.pidTagSequenceNr),
awaitingFlush))
sender() ! ResetPersistenceIdComplete

case ReceiveTimeout =>
// not idle

case StopTagWriter =>
// not idle any more
parent ! CancelPassivateTagWriter(tag)
}

private def sendPubsubNotification(): Unit = {
Expand Down
125 changes: 109 additions & 16 deletions core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import java.lang.{ Integer => JInt, Long => JLong }
import java.net.URLEncoder
import java.util.UUID

import scala.concurrent.Promise

import akka.Done
import akka.actor.SupervisorStrategy.Escalate
import akka.pattern.ask
Expand All @@ -27,6 +29,7 @@ import akka.event.LoggingAdapter
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.journal.TagWriter._
import akka.persistence.cassandra.journal.TagWriters._
import akka.util.ByteString
import akka.util.Timeout
import com.datastax.oss.driver.api.core.cql.{
BatchStatementBuilder,
Expand All @@ -41,7 +44,6 @@ import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.util.ByteString

@InternalApi private[akka] object TagWriters {

Expand Down Expand Up @@ -152,6 +154,13 @@ import akka.util.ByteString
private case class WriteTagScanningCompleted(result: Try[Done], startTime: Long, size: Int)

private case class PersistentActorTerminated(pid: PersistenceId, ref: ActorRef)
private case class TagWriterTerminated(tag: String)

/**
* @param message the message to send
* @param tellOrAsk Left for the `sender` of tell, `right` for the promise of ask.
*/
private case class PassivateBufferEntry(message: Any, tellOrAsk: Either[ActorRef, Promise[Any]])
}

/**
Expand All @@ -173,6 +182,8 @@ import akka.util.ByteString
}

private var tagActors = Map.empty[String, ActorRef]
// When TagWriter is idle it starts passivation process. Incoming messages are buffered here.
private var passivatingTagActors = Map.empty[String, Vector[PassivateBufferEntry]]
// just used for local actor asks
private implicit val timeout: Timeout = Timeout(10.seconds)

Expand All @@ -190,18 +201,17 @@ import akka.util.ByteString
if (log.isDebugEnabled)
log.debug("Flushing all tag writers [{}]", tagActors.keySet.mkString(", "))
val replyTo = sender()
val flushes = tagActors.map {
case (tag, ref) =>
(ref ? Flush)
.mapTo[FlushComplete.type]
.map(fc => {
log.debug("Flush complete for tag {}", tag)
fc
})
val flushes = tagActors.keySet.map { tag =>
askTagActor(tag, Flush)
.mapTo[FlushComplete.type]
.map(fc => {
log.debug("Flush complete for tag {}", tag)
fc
})
}
Future.sequence(flushes).map(_ => AllFlushed).pipeTo(replyTo)
case TagFlush(tag) =>
tagActor(tag).tell(Flush, sender())
tellTagActor(tag, Flush, sender())
case tw: TagWrite =>
forwardTagWrite(tw)
case BulkTagWrite(tws, withoutTags) =>
Expand Down Expand Up @@ -250,13 +260,13 @@ import akka.util.ByteString
val tagWriterAcks = Future.sequence(tagProgresses.map {
case (tag, progress) =>
log.debug("Sending tag progress: [{}] [{}]", tag, progress)
(tagActor(tag) ? ResetPersistenceId(tag, progress)).mapTo[ResetPersistenceIdComplete.type]
askTagActor(tag, ResetPersistenceId(tag, progress)).mapTo[ResetPersistenceIdComplete.type]
})
// We send an empty progress in case the tag actor has buffered events
// and has never written any tag progress for this tag/pid
val blankTagWriterAcks = Future.sequence(missingProgress.map { tag =>
log.debug("Sending blank progress for tag [{}] pid [{}]", tag, pid)
(tagActor(tag) ? ResetPersistenceId(tag, TagProgress(pid, 0, 0))).mapTo[ResetPersistenceIdComplete.type]
askTagActor(tag, ResetPersistenceId(tag, TagProgress(pid, 0, 0))).mapTo[ResetPersistenceIdComplete.type]
})

val recoveryNotificationComplete = for {
Expand Down Expand Up @@ -296,6 +306,31 @@ import akka.util.ByteString
pid,
ref)
}

case PassivateTagWriter(tag) =>
tagActors.get(tag) match {
case Some(tagWriter) =>
if (!passivatingTagActors.contains(tag))
passivatingTagActors = passivatingTagActors.updated(tag, Vector.empty)
log.debug("Tag writer {} for tag [{}] is passivating", tagWriter, tag)
tagWriter ! StopTagWriter
case None =>
log.warning(
"Unknown tag [{}] in passivate request from {}. Please raise an issue with debug logs.",
tag,
sender())
}

case CancelPassivateTagWriter(tag) =>
passivatingTagActors.get(tag).foreach { buffer =>
passivatingTagActors = passivatingTagActors - tag
log.debug("Tag writer {} for tag [{}] canceled passivation.", sender, tag)
sendPassivateBuffer(tag, buffer)
}

case TagWriterTerminated(tag) =>
tagWriterTerminated(tag)

}

private def forwardTagWrite(tw: TagWrite): Unit = {
Expand All @@ -305,7 +340,7 @@ import akka.util.ByteString
tw.serialised.head.persistenceId)
} else {
updatePendingScanning(tw.serialised)
tagActor(tw.tag).forward(tw)
tellTagActor(tw.tag, tw, sender())
}
}

Expand Down Expand Up @@ -385,9 +420,67 @@ import akka.util.ByteString

// protected for testing purposes
protected def createTagWriter(tag: String): ActorRef = {
context.actorOf(
TagWriter.props(settings, tagWriterSession, tag).withDispatcher(context.props.dispatcher),
name = URLEncoder.encode(tag, ByteString.UTF_8))
context.watchWith(
context.actorOf(
TagWriter.props(settings, tagWriterSession, tag, self).withDispatcher(context.props.dispatcher),
name = URLEncoder.encode(tag, ByteString.UTF_8)),
TagWriterTerminated(tag))
}

private def tellTagActor(tag: String, message: Any, snd: ActorRef): Unit = {
passivatingTagActors.get(tag) match {
case Some(buffer) =>
passivatingTagActors = passivatingTagActors.updated(tag, buffer :+ PassivateBufferEntry(message, Left(snd)))
case None =>
tagActor(tag).tell(message, snd)
}
}

private def askTagActor(tag: String, message: Any)(implicit timeout: Timeout): Future[Any] = {
passivatingTagActors.get(tag) match {
case Some(buffer) =>
val p = Promise[Any]()
passivatingTagActors = passivatingTagActors.updated(tag, buffer :+ PassivateBufferEntry(message, Right(p)))
p.future
case None =>
tagActor(tag).ask(message)
}
}

private def tagWriterTerminated(tag: String): Unit = {
tagActors.get(tag) match {
case Some(ref) =>
passivatingTagActors.get(tag) match {
case Some(buffer) =>
tagActors = tagActors - tag
passivatingTagActors = passivatingTagActors - tag
if (buffer.isEmpty)
log.debug("Tag writer {} for tag [{}] terminated after passivation.", ref, tag)
else {
log.debug(
"Tag writer {} for tag [{}] terminated after passivation, but starting again " +
"because [{}] messages buffered.",
ref,
tag,
buffer.size)
sendPassivateBuffer(tag, buffer)
}
case None =>
log.warning(
"Tag writer {} for tag [{}] terminated without passivation. Please raise an issue with debug logs.",
ref,
tag)
tagActors = tagActors - tag
}
case None =>
log.warning("Unknown tag writer for tag [{}] terminated. Please raise an issue with debug logs.", tag)
}
}

private def sendPassivateBuffer(tag: String, buffer: Vector[PassivateBufferEntry]): Unit = {
buffer.foreach {
case PassivateBufferEntry(message, Left(snd)) => tellTagActor(tag, message, snd)
case PassivateBufferEntry(message, Right(promise)) => promise.completeWith(askTagActor(tag, message))
}
}
}
Loading

0 comments on commit 9bc0134

Please sign in to comment.