Skip to content

Commit

Permalink
Merge pull request #859 from sebastian-alfers/bump-akka-2.10.0-M1
Browse files Browse the repository at this point in the history
chore: bump to akka 2.10.0-M1, align with changes from upstream
  • Loading branch information
sebastian-alfers authored Sep 20, 2024
2 parents a8f5e5b + 6c84bca commit 0f66ec0
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.persistence.jdbc.config.{ ConfigKeys, SlickConfiguration }
import akka.persistence.jdbc.util.ConfigOps._
import com.typesafe.config.{ Config, ConfigObject }

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success }

object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import scala.concurrent.Future
import scala.util.Try

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.BaseDaoConfig
import akka.persistence.jdbc.config.JournalConfig
Expand Down Expand Up @@ -46,7 +45,7 @@ class DefaultJournalDao(
override def baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig

override def writeJournalRows(xs: immutable.Seq[(JournalAkkaSerializationRow, Set[String])]): Future[Unit] = {
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContexts.parasitic)
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContext.parasitic)
}

val queries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import akka.persistence.jdbc.config.SnapshotConfig
import akka.serialization.Serialization
import akka.stream.Materializer
import SnapshotTables._
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization

import scala.concurrent.{ ExecutionContext, Future }
Expand Down Expand Up @@ -97,23 +96,23 @@ class DefaultSnapshotDao(

override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot))
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContexts.parasitic)
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContext.parasitic)
}

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContexts.parasitic))
db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContext.parasitic))

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
.map(_ => ())((ExecutionContexts.parasitic))
.map(_ => ())((ExecutionContext.parasitic))

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
.map(_ => ())((ExecutionContexts.parasitic))
.map(_ => ())((ExecutionContext.parasitic))

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
Expand All @@ -123,5 +122,5 @@ class DefaultSnapshotDao(
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
.map(_ => ())((ExecutionContexts.parasitic))
.map(_ => ())((ExecutionContext.parasitic))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.state.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
import scala.concurrent.ExecutionContext
import akka.annotation.ApiMayChange
import slick.jdbc.JdbcProfile
Expand Down Expand Up @@ -40,21 +40,21 @@ class JdbcDurableStateStore[A](
val queries = new DurableStateQueries(profile, durableStateConfig)

def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] =
toJava(
scalaStore
.getObject(persistenceId)
.map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)))
scalaStore
.getObject(persistenceId)
.map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision))
.asJava

def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] =
toJava(scalaStore.upsertObject(persistenceId, revision, value, tag))
scalaStore.upsertObject(persistenceId, revision, value, tag).asJava

@deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0")
override def deleteObject(persistenceId: String): CompletionStage[Done] =
deleteObject(persistenceId, revision = 0)

@nowarn("msg=deprecated")
override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
toJava(scalaStore.deleteObject(persistenceId))
scalaStore.deleteObject(persistenceId).asJava

def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] =
scalaStore.currentChanges(tag, offset).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.testkit.javadsl

import java.util.concurrent.CompletionStage

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.ClassicActorSystemProvider
Expand Down Expand Up @@ -53,7 +53,7 @@ object SchemaUtils {
*/
@ApiMayChange
def dropIfExists(configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] =
SchemaUtilsImpl.dropIfExists(configKey, logger)(actorSystem).toJava
SchemaUtilsImpl.dropIfExists(configKey, logger)(actorSystem).asJava

/**
* Creates the schema for both the journal and the snapshot table using the default schema definition.
Expand Down Expand Up @@ -89,7 +89,7 @@ object SchemaUtils {
*/
@ApiMayChange
def createIfNotExists(configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] =
SchemaUtilsImpl.createIfNotExists(configKey, logger)(actorSystem).toJava
SchemaUtilsImpl.createIfNotExists(configKey, logger)(actorSystem).asJava

/**
* This method can be used to load alternative DDL scripts.
Expand Down Expand Up @@ -125,5 +125,5 @@ object SchemaUtils {
separator: String,
configKey: String,
actorSystem: ClassicActorSystemProvider): CompletionStage[Done] =
SchemaUtilsImpl.applyScript(script, separator, configKey, logger)(actorSystem).toJava
SchemaUtilsImpl.applyScript(script, separator, configKey, logger)(actorSystem).asJava
}
11 changes: 5 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ object Dependencies {

val ScalaVersions = Seq(Scala213, Scala3)

val AkkaVersion = "2.9.3"
val AkkaVersion = "2.10.0-M1"
val AkkaBinaryVersion = AkkaVersion.take(3)

val SlickVersion = "3.5.1"
Expand All @@ -21,11 +21,10 @@ object Dependencies {

val Libraries: Seq[ModuleID] = Seq(
"com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
// Slick 3.5 pulls in slf4j-api 2.2 which doesn't work with Akka
("com.typesafe.slick" %% "slick" % SlickVersion).exclude("org.slf4j", "slf4j-api"),
"org.slf4j" % "slf4j-api" % "1.7.36",
"com.typesafe.slick" %% "slick" % SlickVersion,
"org.slf4j" % "slf4j-api" % "2.0.16",
"com.typesafe.slick" %% "slick-hikaricp" % SlickVersion,
"ch.qos.logback" % "logback-classic" % "1.2.13" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.7" % Test,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-tck" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
Expand All @@ -34,7 +33,7 @@ object Dependencies {

val Migration: Seq[ModuleID] = Seq(
"com.typesafe" % "config" % "1.4.3",
"ch.qos.logback" % "logback-classic" % "1.2.13",
"ch.qos.logback" % "logback-classic" % "1.5.7",
"org.testcontainers" % "postgresql" % "1.20.1" % Test,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Provided)
}

0 comments on commit 0f66ec0

Please sign in to comment.