diff --git a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala index 7fe1d76b..e1835c52 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala @@ -5,25 +5,26 @@ package akka.persistence.jdbc.query -import java.lang.management.ManagementFactory -import java.lang.management.MemoryMXBean -import java.util.UUID - -import akka.actor.ActorSystem +import akka.actor.{ ActorSystem, ExtendedActorSystem } +import akka.persistence.jdbc.config.JournalConfig +import akka.persistence.jdbc.journal.dao.JournalDao import akka.persistence.{ AtomicWrite, PersistentRepr } -import akka.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, JournalTables } -import akka.serialization.SerializationExtension +import akka.serialization.{ Serialization, SerializationExtension } import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ Materializer, SystemMaterializer } import com.typesafe.config.{ ConfigValue, ConfigValueFactory } import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.slf4j.LoggerFactory +import slick.jdbc.JdbcBackend.Database +import slick.jdbc.JdbcProfile +import java.lang.management.{ ManagementFactory, MemoryMXBean } +import java.util.UUID import scala.collection.immutable -import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } import scala.util.{ Failure, Success } -import akka.stream.testkit.scaladsl.TestSink -import org.scalatest.matchers.should.Matchers object JournalDaoStreamMessagesMemoryTest { @@ -33,33 +34,34 @@ object JournalDaoStreamMessagesMemoryTest { } abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) - extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) - with JournalTables - with Matchers { + extends QueryTestSpec(configFile, JournalDaoStreamMessagesMemoryTest.configOverrides) { import JournalDaoStreamMessagesMemoryTest.MB private val log = LoggerFactory.getLogger(this.getClass) - val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration - val journalTableCfg = journalConfig.journalTableConfiguration - - implicit val askTimeout: FiniteDuration = 50.millis - - def generateId: Int = 0 - val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean - behavior.of("Replaying Persistence Actor") - it should "stream events" in { - if (newDao) - pending withActorSystem { implicit system: ActorSystem => withDatabase { db => implicit val ec: ExecutionContextExecutor = system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer val persistenceId = UUID.randomUUID().toString - val dao = new ByteArrayJournalDao(db, profile, journalConfig, SerializationExtension(system)) + val writerUuid = UUID.randomUUID().toString + val fqcn = journalConfig.pluginConfig.dao + val args = Seq( + (classOf[Database], db), + (classOf[JdbcProfile], profile), + (classOf[JournalConfig], journalConfig), + (classOf[Serialization], SerializationExtension(system)), + (classOf[ExecutionContext], ec), + (classOf[Materializer], mat)) + val dao: JournalDao = + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn, args) match { + case Success(dao) => dao + case Failure(cause) => throw cause + } val payloadSize = 5000 // 5000 bytes val eventsPerBatch = 1000 @@ -87,8 +89,8 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String) log.info(s"batch $i - events from $start to $end") val atomicWrites = (start to end).map { j => - AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId))) - }.toSeq + AtomicWrite(immutable.Seq(PersistentRepr(payload, j, persistenceId, writerUuid = writerUuid))) + } dao.asyncWriteMessages(atomicWrites).map(_ => i) }