From 77d6c06a9d45c67a03e46426890120136cb077c4 Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Tue, 27 Jul 2021 11:52:37 +0900 Subject: [PATCH 1/7] =?UTF-8?q?[update]=20RepositoryDefinition=E3=81=ABMap?= =?UTF-8?q?ped/Phased=E3=81=AEsub-trait=E3=82=92=E8=BF=BD=E5=8A=A0?= =?UTF-8?q?=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../player/BukkitRepositoryControls.scala | 13 +- .../FiberAdjoinedRepositoryDefinition.scala | 2 +- .../MutexRepositoryDefinition.scala | 2 +- .../RefDictBackedRepositoryDefinition.scala | 6 +- .../SessionMutexRepositoryDefinition.scala | 2 +- .../SignallingRepositoryDefinition.scala | 5 +- .../template/RepositoryDefinition.scala | 171 ++++++++++-------- .../unchama/seichiassist/SeichiAssist.scala | 2 +- .../subsystems/breakcountbar/System.scala | 2 +- .../subsystems/buildcount/System.scala | 2 +- .../subsystems/fastdiggingeffect/System.scala | 2 +- .../subsystems/itemmigration/System.scala | 2 +- .../ManaRepositoryDefinition.scala | 2 +- .../ManaBarSynchronizationRepository.scala | 2 +- .../subsystems/mebius/System.scala | 4 +- 15 files changed, 119 insertions(+), 100 deletions(-) diff --git a/src/main/scala/com/github/unchama/datarepository/bukkit/player/BukkitRepositoryControls.scala b/src/main/scala/com/github/unchama/datarepository/bukkit/player/BukkitRepositoryControls.scala index b2ff7909ce..6a2fe56b60 100644 --- a/src/main/scala/com/github/unchama/datarepository/bukkit/player/BukkitRepositoryControls.scala +++ b/src/main/scala/com/github/unchama/datarepository/bukkit/player/BukkitRepositoryControls.scala @@ -12,6 +12,7 @@ import org.bukkit.event.player.{AsyncPlayerPreLoginEvent, PlayerJoinEvent} import org.bukkit.event.{EventHandler, EventPriority, Listener} import java.util.UUID +import scala.annotation.tailrec import scala.collection.concurrent.TrieMap case class BukkitRepositoryControls[F[_], R](repository: PlayerDataRepository[R], @@ -27,6 +28,11 @@ case class BukkitRepositoryControls[F[_], R](repository: PlayerDataRepository[R] finalizer.transformContext(trans) ) + import cats.implicits._ + + def map[S](f: R => S): BukkitRepositoryControls[F, S] = + BukkitRepositoryControls(repository.map(f), initializer, backupProcess, finalizer) + def coerceFinalizationContextTo[G[_] : ContextCoercion[F, *[_]]]: BukkitRepositoryControls[G, R] = transformFinalizationContext(ContextCoercion.asFunctionK) } @@ -159,7 +165,7 @@ object BukkitRepositoryControls { import cats.implicits._ definition match { - case RepositoryDefinition.SinglePhased(initialization, tappingAction, finalization) => Sync[F].delay { + case RepositoryDefinition.Phased.SinglePhased(initialization, tappingAction, finalization) => Sync[F].delay { TrieMap.empty[UUID, R] }.map { dataMap => // workaround of https://youtrack.jetbrains.com/issue/SCL-18638 @@ -173,7 +179,7 @@ object BukkitRepositoryControls { ) } - case RepositoryDefinition.TwoPhased(initialization, finalization) => Sync[F].delay { + case RepositoryDefinition.Phased.TwoPhased(initialization, finalization) => Sync[F].delay { (TrieMap.empty[Player, R], TrieMap.empty[UUID, initialization.IntermediateData]) }.map { case (dataMap, temporaryDataMap) => // workaround of https://youtrack.jetbrains.com/issue/SCL-18638 @@ -186,6 +192,9 @@ object BukkitRepositoryControls { Finalizers.twoPhased(finalization)(temporaryDataMap, dataMap) ) } + + case rd: RepositoryDefinition.Mapped[F, Player, s, R] => + createHandles[F, s](rd.source).map(_.map(rd.sr)) } } } diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/FiberAdjoinedRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/FiberAdjoinedRepositoryDefinition.scala index 11eb02fd31..e31890ecd3 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/FiberAdjoinedRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/FiberAdjoinedRepositoryDefinition.scala @@ -16,7 +16,7 @@ object FiberAdjoinedRepositoryDefinition { G[_] : Sync, F[_] : ConcurrentEffect, Player, R - ](definition: RepositoryDefinition[G, Player, R]): definition.Self[R FiberAdjoined F] = + ](definition: RepositoryDefinition.Phased[G, Player, R]): definition.Self[R FiberAdjoined F] = definition .flatXmapWithIntermediateEffects(r => Deferred diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/MutexRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/MutexRepositoryDefinition.scala index b906ee53fd..65056f811c 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/MutexRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/MutexRepositoryDefinition.scala @@ -9,7 +9,7 @@ object MutexRepositoryDefinition { def over[ F[_] : Concurrent, G[_] : Sync : ContextCoercion[*[_], F], Player, R - ](underlying: RepositoryDefinition[G, Player, R]): underlying.Self[Mutex[F, G, R]] = + ](underlying: RepositoryDefinition.Phased[G, Player, R]): underlying.Self[Mutex[F, G, R]] = underlying.flatXmap(r => Mutex.of[F, G, R](r))(_.readLatest) } diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/RefDictBackedRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/RefDictBackedRepositoryDefinition.scala index 7c51dd144f..3cf688e81b 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/RefDictBackedRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/RefDictBackedRepositoryDefinition.scala @@ -14,7 +14,7 @@ object RefDictBackedRepositoryDefinition { def usingUuidRefDictWithEffectfulDefault[ F[_] : Monad, Player, R - ](refDict: RefDict[F, UUID, R])(getDefaultValue: F[R]): RepositoryDefinition.SinglePhased[F, Player, R] = { + ](refDict: RefDict[F, UUID, R])(getDefaultValue: F[R]): RepositoryDefinition.Phased.SinglePhased[F, Player, R] = { val initialization: SinglePhasedRepositoryInitialization[F, R] = (uuid, _) => refDict .read(uuid) @@ -27,12 +27,12 @@ object RefDictBackedRepositoryDefinition { val finalization: RepositoryFinalization[F, UUID, R] = RepositoryFinalization.withoutAnyFinalization((uuid, r) => refDict.write(uuid, r)) - RepositoryDefinition.SinglePhased.withoutTappingAction(initialization, finalization) + RepositoryDefinition.Phased.SinglePhased.withoutTappingAction(initialization, finalization) } def usingUuidRefDict[F[_] : Monad, Player, R](refDict: RefDict[F, UUID, R]) - (defaultValue: R): RepositoryDefinition.SinglePhased[F, Player, R] = + (defaultValue: R): RepositoryDefinition.Phased.SinglePhased[F, Player, R] = usingUuidRefDictWithEffectfulDefault(refDict)(Monad[F].pure(defaultValue)) } diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/SessionMutexRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/SessionMutexRepositoryDefinition.scala index 592b547e59..a5185319fa 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/SessionMutexRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/SessionMutexRepositoryDefinition.scala @@ -19,7 +19,7 @@ object SessionMutexRepositoryDefinition { G[_] : Sync : ContextCoercion[*[_], F], Player ]: RepositoryDefinition[G, Player, SessionMutex[F, G]] = { - RepositoryDefinition.SinglePhased.withoutTappingAction( + RepositoryDefinition.Phased.SinglePhased.withoutTappingAction( SinglePhasedRepositoryInitialization.withSupplier(SessionMutex.newIn[F, G]), RepositoryFinalization.withoutAnyPersistence[G, UUID, SessionMutex[F, G]] { (_, mutex) => EffectExtra.runAsyncAndForget[F, G, Unit] { diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala index c7fa288528..7305b2a397 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala @@ -20,9 +20,8 @@ object SignallingRepositoryDefinition { F[_] : ConcurrentEffect : ContextCoercion[G, *[_]] : ErrorLogger, Player: HasUuid, T ](publishSink: Pipe[F, (Player, T), Unit]) - (definition: RepositoryDefinition[G, Player, T]): RepositoryDefinition[G, Player, Ref[G, T]] = { - definition.toTwoPhased.flatXmapWithPlayer { player => - initialValue => + (definition: RepositoryDefinition.Phased[G, Player, T]): RepositoryDefinition[G, Player, Ref[G, T]] = { + definition.toTwoPhased.flatXmapWithPlayer { player => initialValue => AsymmetricSignallingRef[G, F, T](initialValue) .flatTap { ref => EffectExtra.runAsyncAndForget[F, G, Unit] { diff --git a/src/main/scala/com/github/unchama/datarepository/template/RepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/template/RepositoryDefinition.scala index 0d7b2e07dd..c5d9286daa 100644 --- a/src/main/scala/com/github/unchama/datarepository/template/RepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/template/RepositoryDefinition.scala @@ -10,106 +10,117 @@ import com.github.unchama.minecraft.algebra.HasUuid import java.util.UUID sealed trait RepositoryDefinition[F[_], Player, R] { + final def map[S](f: R => S): RepositoryDefinition[F, Player, S] = RepositoryDefinition.Mapped(this, f) +} - type Self[S] <: RepositoryDefinition[F, Player, S] +object RepositoryDefinition { - def toTwoPhased(implicit F: Monad[F], playerHasUuid: HasUuid[Player]): RepositoryDefinition.TwoPhased[F, Player, R] = - this match { - case s@RepositoryDefinition.SinglePhased(_, _, _) => s.augmentToTwoPhased((_, r) => F.pure(r))(F.pure[R]) - case t@RepositoryDefinition.TwoPhased(_, _) => t - } + import cats.implicits._ - def flatXmapWithIntermediateEffects[S](f: R => F[S]) - (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) - (implicit F: Monad[F]): Self[S] + /** + * データ生成処理と終了処理についての操作権を持つ [[RepositoryDefinition]]。 + */ + sealed trait Phased[F[_], Player, R] extends RepositoryDefinition[F, Player, R] { + type Self[S] <: RepositoryDefinition[F, Player, S] - def flatXmap[S](f: R => F[S])(g: S => F[R])(implicit F: Monad[F]): Self[S] = - flatXmapWithIntermediateEffects(f)(g)(g) + def flatXmapWithIntermediateEffects[S](f: R => F[S]) + (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) + (implicit F: Monad[F]): Self[S] - def xmap[S](f: R => S)(g: S => R)(implicit F: Monad[F]): Self[S] = - flatXmap(r => F.pure(f(r)))(s => F.pure(g(s))) + final def toTwoPhased(implicit F: Monad[F], playerHasUuid: HasUuid[Player]): Phased.TwoPhased[F, Player, R] = + this match { + case s@Phased.SinglePhased(_, _, _) => s.augmentToTwoPhased((_, r) => F.pure(r))(F.pure[R]) + case t@Phased.TwoPhased(_, _) => t + } - def toRefRepository(implicit F: Sync[F]): Self[Ref[F, R]] = - flatXmap(r => Ref.of[F, R](r))(ref => ref.get) + final def flatXmap[S](f: R => F[S])(g: S => F[R])(implicit F: Monad[F]): Self[S] = + flatXmapWithIntermediateEffects(f)(g)(g) - def augmentF[S](f: R => F[S])(implicit F: Monad[F]): Self[(R, S)] = - flatXmap(r => F.map(f(r))(s => (r, s)))(rs => F.pure(rs._1)) -} + final def xmap[S](f: R => S)(g: S => R)(implicit F: Monad[F]): Self[S] = + flatXmap(r => F.pure(f(r)))(s => F.pure(g(s))) -object RepositoryDefinition { + final def toRefRepository(implicit F: Sync[F]): Self[Ref[F, R]] = + flatXmap(r => Ref.of[F, R](r))(ref => ref.get) - import cats.implicits._ + final def augmentF[S](f: R => F[S])(implicit F: Monad[F]): Self[(R, S)] = + flatXmap(r => F.map(f(r))(s => (r, s)))(rs => F.pure(rs._1)) + } - case class SinglePhased[F[_], Player, R](initialization: SinglePhasedRepositoryInitialization[F, R], - tappingAction: (Player, R) => F[Unit], - finalization: RepositoryFinalization[F, UUID, R]) - extends RepositoryDefinition[F, Player, R] { + object Phased { - override type Self[S] = SinglePhased[F, Player, S] + case class SinglePhased[F[_], Player, R](initialization: SinglePhasedRepositoryInitialization[F, R], + tappingAction: (Player, R) => F[Unit], + finalization: RepositoryFinalization[F, UUID, R]) + extends Phased[F, Player, R] { - override def flatXmapWithIntermediateEffects[S](f: R => F[S]) - (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) - (implicit F: Monad[F]): SinglePhased[F, Player, S] = - RepositoryDefinition.SinglePhased( - (uuid, name) => initialization.prepareData(uuid, name).flatMap(_.traverse(f)), - (player, s) => beforePersisting(s).flatMap(tappingAction(player, _)), - finalization.withIntermediateEffects(beforePersisting)(beforeFinalization) - ) + override type Self[S] = SinglePhased[F, Player, S] + + override def flatXmapWithIntermediateEffects[S](f: R => F[S]) + (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) + (implicit F: Monad[F]): SinglePhased[F, Player, S] = + SinglePhased( + (uuid, name) => initialization.prepareData(uuid, name).flatMap(_.traverse(f)), + (player, s) => beforePersisting(s).flatMap(tappingAction(player, _)), + finalization.withIntermediateEffects(beforePersisting)(beforeFinalization) + ) - def withAnotherTappingAction[U](another: (Player, R) => F[U]) - (implicit F: Apply[F]): SinglePhased[F, Player, R] = this.copy( - tappingAction = (player, r) => F.productR(tappingAction(player, r))(another(player, r).as(())) - ) - - def augmentToTwoPhased[T](prepareFinalData: (Player, R) => F[T])(revertOnFinalization: T => F[R]) - (implicit F: Monad[F], playerHasUuid: HasUuid[Player]): TwoPhased[F, Player, T] = - TwoPhased( - TwoPhasedRepositoryInitialization.augment(initialization)((player, r) => - tappingAction(player, r) >> prepareFinalData(player, r) - ), - finalization - .withIntermediateEffect(revertOnFinalization) - .contraMapKey(playerHasUuid.asFunction) + def withAnotherTappingAction[U](another: (Player, R) => F[U]) + (implicit F: Apply[F]): SinglePhased[F, Player, R] = this.copy( + tappingAction = (player, r) => F.productR(tappingAction(player, r))(another(player, r).as(())) ) - } - object SinglePhased { - def withoutTappingAction[ - F[_] : Applicative, Player, R - ](initialization: SinglePhasedRepositoryInitialization[F, R], - finalization: RepositoryFinalization[F, UUID, R]): SinglePhased[F, Player, R] = { - SinglePhased(initialization, (_, _) => Applicative[F].unit, finalization) + def augmentToTwoPhased[T](prepareFinalData: (Player, R) => F[T])(revertOnFinalization: T => F[R]) + (implicit F: Monad[F], playerHasUuid: HasUuid[Player]): TwoPhased[F, Player, T] = + TwoPhased( + TwoPhasedRepositoryInitialization.augment(initialization)((player, r) => + tappingAction(player, r) >> prepareFinalData(player, r) + ), + finalization + .withIntermediateEffect(revertOnFinalization) + .contraMapKey(playerHasUuid.asFunction) + ) } - def trivial[F[_] : Applicative, Player]: SinglePhased[F, Player, Unit] = withoutTappingAction( - SinglePhasedRepositoryInitialization.constant(()), - RepositoryFinalization.trivial - ) - - def withSupplierAndTrivialFinalization[F[_] : Monad, Player, R](supplier: F[R]): SinglePhased[F, Player, R] = - trivial[F, Player].flatXmap(_ => supplier)(_ => Applicative[F].unit) - } - - case class TwoPhased[F[_], Player, R](initialization: TwoPhasedRepositoryInitialization[F, Player, R], - finalization: RepositoryFinalization[F, Player, R]) - extends RepositoryDefinition[F, Player, R] { - override type Self[S] = TwoPhased[F, Player, S] - - def flatXmapWithPlayerAndIntermediateEffects[S](f: Player => R => F[S]) - (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) - (implicit F: Monad[F]): TwoPhased[F, Player, S] = - RepositoryDefinition.TwoPhased( - initialization.extendPreparation(f), - finalization.withIntermediateEffects(beforePersisting)(beforeFinalization) + object SinglePhased { + def withoutTappingAction[ + F[_] : Applicative, Player, R + ](initialization: SinglePhasedRepositoryInitialization[F, R], + finalization: RepositoryFinalization[F, UUID, R]): SinglePhased[F, Player, R] = { + SinglePhased(initialization, (_, _) => Applicative[F].unit, finalization) + } + + def trivial[F[_] : Applicative, Player]: SinglePhased[F, Player, Unit] = withoutTappingAction( + SinglePhasedRepositoryInitialization.constant(()), + RepositoryFinalization.trivial ) - override def flatXmapWithIntermediateEffects[S](f: R => F[S]) - (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) - (implicit F: Monad[F]): TwoPhased[F, Player, S] = - flatXmapWithPlayerAndIntermediateEffects(_ => f)(beforePersisting)(beforeFinalization) + def withSupplierAndTrivialFinalization[F[_] : Monad, Player, R](supplier: F[R]): SinglePhased[F, Player, R] = + trivial[F, Player].flatXmap(_ => supplier)(_ => Applicative[F].unit) + } - def flatXmapWithPlayer[S](f: Player => R => F[S])(g: S => F[R])(implicit F: Monad[F]): TwoPhased[F, Player, S] = - flatXmapWithPlayerAndIntermediateEffects(f)(g)(g) + case class TwoPhased[F[_], Player, R](initialization: TwoPhasedRepositoryInitialization[F, Player, R], + finalization: RepositoryFinalization[F, Player, R]) + extends Phased[F, Player, R] { + override type Self[S] = TwoPhased[F, Player, S] + + def flatXmapWithPlayerAndIntermediateEffects[S](f: Player => R => F[S]) + (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) + (implicit F: Monad[F]): TwoPhased[F, Player, S] = + TwoPhased( + initialization.extendPreparation(f), + finalization.withIntermediateEffects(beforePersisting)(beforeFinalization) + ) + + override def flatXmapWithIntermediateEffects[S](f: R => F[S]) + (beforePersisting: S => F[R])(beforeFinalization: S => F[R]) + (implicit F: Monad[F]): TwoPhased[F, Player, S] = + flatXmapWithPlayerAndIntermediateEffects(_ => f)(beforePersisting)(beforeFinalization) + + def flatXmapWithPlayer[S](f: Player => R => F[S])(g: S => F[R])(implicit F: Monad[F]): TwoPhased[F, Player, S] = + flatXmapWithPlayerAndIntermediateEffects(f)(g)(g) + } } + case class Mapped[F[_], Player, S, R](source: RepositoryDefinition[F, Player, S], sr: S => R) + extends RepositoryDefinition[F, Player, R] } diff --git a/src/main/scala/com/github/unchama/seichiassist/SeichiAssist.scala b/src/main/scala/com/github/unchama/seichiassist/SeichiAssist.scala index 9a4674a6fa..ef16e38c9d 100644 --- a/src/main/scala/com/github/unchama/seichiassist/SeichiAssist.scala +++ b/src/main/scala/com/github/unchama/seichiassist/SeichiAssist.scala @@ -111,7 +111,7 @@ class SeichiAssist extends JavaPlugin() { private val activeSkillAvailabilityRepositoryControls: BukkitRepositoryControls[SyncIO, Ref[SyncIO, Boolean]] = BukkitRepositoryControls.createHandles[SyncIO, Ref[SyncIO, Boolean]]( - RepositoryDefinition.SinglePhased.withoutTappingAction( + RepositoryDefinition.Phased.SinglePhased.withoutTappingAction( SinglePhasedRepositoryInitialization.withSupplier(Ref[SyncIO].of(true)), RepositoryFinalization.trivial ) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/System.scala index c698017b51..0cd44c3496 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/System.scala @@ -56,7 +56,7 @@ object System { expBarSynchronizationRepositoryHandles <- { ContextCoercion { BukkitRepositoryControls.createHandles( - RepositoryDefinition.TwoPhased( + RepositoryDefinition.Phased.TwoPhased( ExpBarSynchronizationRepositoryTemplate.initialization[G, F, Player]( breakCountReadAPI.seichiAmountUpdates, visibilityValues diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/buildcount/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/buildcount/System.scala index 83770be20b..0a9c595add 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/buildcount/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/buildcount/System.scala @@ -46,7 +46,7 @@ object System { for { rateLimiterRepositoryControls <- BukkitRepositoryControls.createHandles( - RepositoryDefinition.SinglePhased.withoutTappingAction( + RepositoryDefinition.Phased.SinglePhased.withoutTappingAction( RateLimiterRepositoryDefinitions.initialization[F, G], RateLimiterRepositoryDefinitions.finalization[G, UUID] ) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala index d3fde4ba96..cafce8162b 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/System.scala @@ -76,7 +76,7 @@ object System { effectListRepositoryHandles <- { ContextCoercion { BukkitRepositoryControls.createHandles( - RepositoryDefinition.SinglePhased( + RepositoryDefinition.Phased.SinglePhased( EffectListRepositoryDefinitions.initialization[F, G], EffectListRepositoryDefinitions.tappingAction[F, G, Player](effectListTopic), EffectListRepositoryDefinitions.finalization[F, G, UUID] diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/itemmigration/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/itemmigration/System.scala index cc0f38b04f..1125f96635 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/itemmigration/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/itemmigration/System.scala @@ -49,7 +49,7 @@ object System { repositoryControls <- BukkitRepositoryControls.createHandles( - RepositoryDefinition.SinglePhased.withoutTappingAction( + RepositoryDefinition.Phased.SinglePhased.withoutTappingAction( ItemMigrationStateRepositoryDefinitions.initialization[G], ItemMigrationStateRepositoryDefinitions.finalization[G, UUID] ) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala index 5049aca0d5..c19af10014 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala @@ -24,7 +24,7 @@ object ManaRepositoryDefinition { (implicit breakCountReadAPI: BreakCountReadAPI[F, G, Player]) : RepositoryDefinition[G, Player, Ref[G, LevelCappedManaAmount]] = { - val valueRepository: RepositoryDefinition.TwoPhased[G, Player, LevelCappedManaAmount] = + val valueRepository: RepositoryDefinition.Phased.TwoPhased[G, Player, LevelCappedManaAmount] = RefDictBackedRepositoryDefinition .usingUuidRefDict[G, Player, ManaAmount](persistence)(ManaAmount(0)) .toTwoPhased diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala index 78278ec8e6..340532d425 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/manabar/application/ManaBarSynchronizationRepository.scala @@ -24,7 +24,7 @@ object ManaBarSynchronizationRepository { ](manaValues: fs2.Stream[F, (Player, LevelCappedManaAmount)]) (createFreshBossBar: G[BossBarWithPlayer[F, Player]]): RepositoryDefinition[G, Player, _] = { FiberAdjoinedRepositoryDefinition.extending { - RepositoryDefinition.SinglePhased + RepositoryDefinition.Phased.SinglePhased .withSupplierAndTrivialFinalization[G, Player, BossBarWithPlayer[F, Player]](createFreshBossBar) }.withAnotherTappingAction { (player, pair) => val (bossBar, promise) = pair diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/mebius/System.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/mebius/System.scala index 634d182c8b..fb9b68bffd 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/mebius/System.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/mebius/System.scala @@ -42,7 +42,7 @@ object System { implicit val randomEffect: RandomEffect[G] = RandomEffect.createFromRandom(Random) BukkitRepositoryControls.createHandles( - RepositoryDefinition.TwoPhased( + RepositoryDefinition.Phased.TwoPhased( SpeechServiceRepositoryDefinitions.initialization[SyncIO, Player], SpeechServiceRepositoryDefinitions.finalization[SyncIO, Player] ) @@ -51,7 +51,7 @@ object System { speechServiceRepositoryControls.repository BukkitRepositoryControls.createHandles( - RepositoryDefinition.TwoPhased( + RepositoryDefinition.Phased.TwoPhased( MebiusSpeechRoutineFiberRepositoryDefinitions.initialization[SyncIO], MebiusSpeechRoutineFiberRepositoryDefinitions.finalization[SyncIO, Player] ) From e1c4682bf7a2841e506fc5b782996555b1879d64 Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Wed, 28 Jul 2021 09:31:40 +0900 Subject: [PATCH 2/7] =?UTF-8?q?[update]=20SignallingRepositoryDefinition?= =?UTF-8?q?=E3=81=AE=E3=83=A1=E3=83=A2=E3=83=AA=E3=83=AA=E3=83=BC=E3=82=AF?= =?UTF-8?q?=E3=82=92=E4=BF=AE=E6=AD=A3=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SignallingRepositoryDefinition.scala | 42 ++++++++++++------- ...untBarVisibilityRepositoryDefinition.scala | 2 +- .../ManaRepositoryDefinition.scala | 2 +- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala b/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala index 7305b2a397..d57c27ddd2 100644 --- a/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/datarepository/definitions/SignallingRepositoryDefinition.scala @@ -1,8 +1,9 @@ package com.github.unchama.datarepository.definitions import cats.effect.concurrent.Ref -import cats.effect.{ConcurrentEffect, Sync} +import cats.effect.{Concurrent, ConcurrentEffect, Sync} import com.github.unchama.datarepository.template.RepositoryDefinition +import com.github.unchama.datarepository.template.RepositoryDefinition.Phased import com.github.unchama.generic.ContextCoercion import com.github.unchama.generic.effect.EffectExtra import com.github.unchama.generic.effect.concurrent.AsymmetricSignallingRef @@ -13,33 +14,42 @@ import io.chrisdavenport.log4cats.ErrorLogger object SignallingRepositoryDefinition { + import FiberAdjoinedRepositoryDefinition.FiberAdjoined import cats.implicits._ - def forPlayerTopic[ + def withPublishSink[ G[_] : Sync, F[_] : ConcurrentEffect : ContextCoercion[G, *[_]] : ErrorLogger, Player: HasUuid, T ](publishSink: Pipe[F, (Player, T), Unit]) - (definition: RepositoryDefinition.Phased[G, Player, T]): RepositoryDefinition[G, Player, Ref[G, T]] = { - definition.toTwoPhased.flatXmapWithPlayer { player => initialValue => + (definition: RepositoryDefinition.Phased[G, Player, T]): Phased.TwoPhased[G, Player, Ref[G, T] FiberAdjoined F] = { + FiberAdjoinedRepositoryDefinition.extending(definition.toTwoPhased).flatXmapWithPlayer { + player => { case (initialValue, fiberPromise) => AsymmetricSignallingRef[G, F, T](initialValue) .flatTap { ref => EffectExtra.runAsyncAndForget[F, G, Unit] { - ref - .valuesAwait - .use { stream => - // FIXME: This *never* returns. It is likely that this is not garbage collected. - // We might need to find a way to - // - restart the stream when the downstream stream fails - // - unsubscribe when the player exits - // We should be able to achieve this by returning a CancelToken or something on this flatXmapWithPlayer - StreamExtra.compileToRestartingStream("[SignallingRepositoryDefinition]") { - stream.map(player -> _).through(publishSink) + Concurrent[F].start[Nothing] { + ref + .valuesAwait + .use[F, Nothing] { stream => + StreamExtra.compileToRestartingStream[F, Nothing]("[SignallingRepositoryDefinition]") { + stream.map(player -> _).through(publishSink) + } } - } + } >>= fiberPromise.complete } } .widen[Ref[G, T]] - } { ref => ref.get } + .map(_ -> fiberPromise) + } + } { case (ref, fiberPromise) => ref.get.map(_ -> fiberPromise) } } + + def withPublishSinkHidden[ + G[_] : Sync, + F[_] : ConcurrentEffect : ContextCoercion[G, *[_]] : ErrorLogger, + Player: HasUuid, T + ](publishSink: Pipe[F, (Player, T), Unit]) + (definition: RepositoryDefinition.Phased[G, Player, T]): RepositoryDefinition[G, Player, Ref[G, T]] = + withPublishSink(publishSink)(definition).map(_._1) } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/BreakCountBarVisibilityRepositoryDefinition.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/BreakCountBarVisibilityRepositoryDefinition.scala index be8986b375..5fd1dd1dc4 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/BreakCountBarVisibilityRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/breakcountbar/application/BreakCountBarVisibilityRepositoryDefinition.scala @@ -18,7 +18,7 @@ object BreakCountBarVisibilityRepositoryDefinition { Player: HasUuid, ](persistence: BreakCountBarVisibilityPersistence[G], publishChanges: Pipe[F, (Player, BreakCountBarVisibility), Unit]): RepositoryDefinition[G, Player, Ref[G, BreakCountBarVisibility]] = - SignallingRepositoryDefinition.forPlayerTopic(publishChanges) { + SignallingRepositoryDefinition.withPublishSinkHidden(publishChanges) { RefDictBackedRepositoryDefinition.usingUuidRefDict(persistence)(BreakCountBarVisibility.Shown) } diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala index c19af10014..2cd0a5b79d 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/mana/application/ManaRepositoryDefinition.scala @@ -35,7 +35,7 @@ object ManaRepositoryDefinition { )(cappedMana => Monad[G].pure(cappedMana.manaAmount)) SignallingRepositoryDefinition - .forPlayerTopic[G, F, Player, LevelCappedManaAmount](publishChanges)(valueRepository) + .withPublishSinkHidden[G, F, Player, LevelCappedManaAmount](publishChanges)(valueRepository) } } From 72753b5b53c9f45c15e095b14cf70ce41b780ab5 Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Thu, 29 Jul 2021 06:39:02 +0900 Subject: [PATCH 3/7] =?UTF-8?q?[fix]=20AsymmetricSignallingRef=E3=81=AE?= =?UTF-8?q?=E5=AE=9A=E7=BE=A9=E3=82=92=E3=82=88=E3=82=8A=E5=8E=B3=E5=AF=86?= =?UTF-8?q?=E3=81=AB=E3=81=97=E3=80=81=E3=83=86=E3=82=B9=E3=83=88=E3=82=92?= =?UTF-8?q?=E5=BC=B7=E5=8C=96=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../concurrent/AsymmetricSignallingRef.scala | 12 +++++- .../effect/stream/ReorderingPipe.scala | 41 ++++--------------- .../AsymmetricSignallingRefSpec.scala | 8 ++-- .../effect/stream/ReorderingPipeSpec.scala | 5 +-- 4 files changed, 23 insertions(+), 43 deletions(-) diff --git a/src/main/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRef.scala b/src/main/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRef.scala index 01e5f4a279..8a425bd14c 100644 --- a/src/main/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRef.scala +++ b/src/main/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRef.scala @@ -22,8 +22,10 @@ abstract class AsymmetricSignallingRef[G[_], F[_], A] extends Ref[G, A] { /** * 更新値へのsubscriptionを [[Resource]] として表現したもの。 - * subscriptionが有効になった瞬間に [[Resource]] として利用可能になり、 + * subscriptionが有効になった後に [[Resource]] として利用可能になり、 * 利用が終了した後に自動的にunsubscribeされる。 + * + * [[Resource]] として利用可能である間は更新が行われた順に更新値が全て得られることが保証される。 */ val valuesAwait: Resource[F, Stream[F, A]] @@ -99,7 +101,13 @@ object AsymmetricSignallingRef { override val valuesAwait: Resource[F, Stream[F, A]] = changeTopic .subscribeAwait(topicQueueSize) - .map(_.through(ReorderingPipe[F, A])) + .flatMap { subscription => + Resource.liftF { + state.get.map { currentValue => + subscription.through(ReorderingPipe.withInitialToken[F, A](currentValue.nextStamp)) + } + }.mapK(GToF) + } override def get: G[A] = state.get.map(_.value) diff --git a/src/main/scala/com/github/unchama/generic/effect/stream/ReorderingPipe.scala b/src/main/scala/com/github/unchama/generic/effect/stream/ReorderingPipe.scala index 53947c8fb1..bf71fea41a 100644 --- a/src/main/scala/com/github/unchama/generic/effect/stream/ReorderingPipe.scala +++ b/src/main/scala/com/github/unchama/generic/effect/stream/ReorderingPipe.scala @@ -1,6 +1,5 @@ package com.github.unchama.generic.effect.stream -import cats.{Id, Monad} import com.github.unchama.generic.Token import fs2.{Chunk, Pipe, Stream} @@ -17,7 +16,7 @@ object ReorderingPipe { case class TimeStamped[+A](currentStamp: Token, nextStamp: Token, value: A) /** - * [[ReorderingPipe.apply]] で返される[[Pipe]]が使用する内部状態。 + * ReorderingPipeの内部状態。 * * @param nextToken 次の値のタイムスタンプ * @param waitMap [[TimeStamped.currentStamp]]をキーに、パイプにすでに到着した値とその次のタイムスタンプを保持する [[Map]] @@ -45,38 +44,12 @@ object ReorderingPipe { /** * シーケンスされたタイムスタンプ付きの値を流す [[Stream]] を並び替える [[Pipe]]。 * - * 与えられたストリームの最初のChunkの極小の [[TimeStamped.currentStamp]] よりも - * タイムスタンプが古い要素は返されるストリームに出力されない。 + * `token` よりもタイムスタンプが古い要素は返されるストリームに出力されない。 + * + * @param token 出力 [[Stream]] の最初の要素となることを期待される入力が持つタイムスタンプ */ - def apply[F[_], A]: Pipe[F, TimeStamped[A], A] = - in => StreamExtra.uncons(in).flatMap { case (firstChunk, rest) => - val vector = firstChunk.toVector // nonempty - val nextTokenMap: Map[Token, TimeStamped[A]] = vector.map(t => (t.nextStamp, t)).toMap - val minimalTimeStamp = { - @tailrec - def go(currentCandidate: TimeStamped[A]): TimeStamped[A] = - nextTokenMap.get(currentCandidate.currentStamp) match { - case Some(value) => go(value) - case None => currentCandidate - } - - go(vector.head) - } - - val (initialWaitMap, initialChunk) = { - val remainingValuesInFirstChunk = nextTokenMap.removed(minimalTimeStamp.nextStamp).values - - WaitMap[A]( - minimalTimeStamp.nextStamp, - remainingValuesInFirstChunk - .map { case TimeStamped(currentStamp, nextStamp, value) => - (currentStamp, (value, nextStamp)) - } - .toMap - ).flushWith(Chunk.empty) - } - - Stream.emit(minimalTimeStamp.value) ++ Stream.chunk(initialChunk) ++ - rest.scanChunks(initialWaitMap) { case (waitMap, nextChunk) => waitMap.flushWith(nextChunk) } + def withInitialToken[F[_], A](token: Token): Pipe[F, TimeStamped[A], A] = + _.scanChunks(WaitMap[A](token, Map.empty)) { case (waitMap, nextChunk) => + waitMap.flushWith(nextChunk) } } diff --git a/src/test/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRefSpec.scala b/src/test/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRefSpec.scala index 8194b2ea58..a6b7a2f9a1 100644 --- a/src/test/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRefSpec.scala +++ b/src/test/scala/com/github/unchama/generic/effect/concurrent/AsymmetricSignallingRefSpec.scala @@ -1,6 +1,6 @@ package com.github.unchama.generic.effect.concurrent -import cats.effect.{ContextShift, IO, SyncIO, Timer} +import cats.effect.{ContextShift, IO, Timer} import com.github.unchama.testutil.concurrent.tests.ConcurrentEffectTest import com.github.unchama.testutil.execution.MonixTestSchedulerTests import monix.eval.Task @@ -35,16 +35,16 @@ class AsymmetricSignallingRefSpec "signal all the changes" in { val initialValue: Value = 0 - forAll(minSuccessful(100)) { updates: List[Value] => + forAll(minSuccessful(10000)) { updates: List[Value] => val task = for { - ref <- AsymmetricSignallingRef.in[Task, SyncIO, Task, Value](initialValue) + ref <- AsymmetricSignallingRef.in[Task, Task, Task, Value](initialValue) updateResult <- ref .valuesAwait .use { stream => for { resultFiber <- stream.take(updates.length).compile.toList.start - _ <- Task.liftFrom[SyncIO].apply(updates.traverse(ref.set)) + _ <- updates.traverse(ref.set) result <- resultFiber.join } yield result } diff --git a/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala b/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala index da9274031b..e7bd1388a6 100644 --- a/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala +++ b/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala @@ -51,14 +51,13 @@ class ReorderingPipeSpec } val createRandomizedInput: SyncIO[Vector[TimeStamped[TestInputType]]] = SyncIO { - val Vector(a, b@_*) = timeStamped - a +: Random.shuffle(b.toVector) + Random.shuffle(timeStamped) } val program = fs2.Stream .evals(createRandomizedInput) - .through(ReorderingPipe[SyncIO, TestInputType]) + .through(ReorderingPipe.withInitialToken[SyncIO, TestInputType](timeStamped.head.currentStamp)) .compile .toList From 6559b4bd35ca5e6cce307e00010f7a5c8a863962 Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Thu, 29 Jul 2021 06:42:23 +0900 Subject: [PATCH 4/7] =?UTF-8?q?[update=20doc]=20FIXME=E3=81=AE=E6=96=87?= =?UTF-8?q?=E8=A8=80=E3=82=92=E4=BF=AE=E6=AD=A3=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/github/unchama/fs2/workaround/fs3/Fs3Channel.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/com/github/unchama/fs2/workaround/fs3/Fs3Channel.scala b/src/main/scala/com/github/unchama/fs2/workaround/fs3/Fs3Channel.scala index 46c05ebe53..f96d57c7a6 100644 --- a/src/main/scala/com/github/unchama/fs2/workaround/fs3/Fs3Channel.scala +++ b/src/main/scala/com/github/unchama/fs2/workaround/fs3/Fs3Channel.scala @@ -165,8 +165,7 @@ object Fs3Channel { // The latest implementation of fs2.concurrent.Channel would poll on the producer.get // to allow cancellation of the send action. This is achieved by the MonadCancel typeclass of // cats-effect 3, which is not available on cats-effect 2. - // However, on SeichiAssist, we hardly cancel any concurrent action, - // hence we are not allowing any cancellation to happen. + // We are temporarily not allowing any cancellation to happen here, but this can be improved. notifyStream(waiting) <* producer.get ) }.flatten From b2318ab2d7b34ea37b9a6fc8457ecef268667d5b Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Thu, 29 Jul 2021 10:44:25 +0900 Subject: [PATCH 5/7] =?UTF-8?q?[update=20test]=20ReorderingPipe=E3=81=AE?= =?UTF-8?q?=E3=83=86=E3=82=B9=E3=83=88=E3=82=92=E3=82=88=E3=82=8A=E5=BC=B7?= =?UTF-8?q?=E5=9B=BA=E3=81=AA=E3=82=82=E3=81=AE=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../unchama/generic/effect/stream/ReorderingPipeSpec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala b/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala index e7bd1388a6..a88b34f10d 100644 --- a/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala +++ b/src/test/scala/com/github/unchama/generic/effect/stream/ReorderingPipeSpec.scala @@ -29,8 +29,7 @@ class ReorderingPipeSpec type TestInputType = Long "Reorder scrambled inputs as long as they are timestamped" in { - forAll { _: List[TestInputType] => - val input = List(0L, 0) + forAll(minSuccessful(100)) { input: List[TestInputType] => whenever(input.nonEmpty) { val timeStamped: Vector[TimeStamped[TestInputType]] = { val withCurrentStamps = input.map(input => (new Token, input)) From 1fcd3ea1b345d631e1b50af0391a5006879d3afe Mon Sep 17 00:00:00 2001 From: Kory <6561358+kory33@users.noreply.github.com> Date: Thu, 29 Jul 2021 11:03:16 +0900 Subject: [PATCH 6/7] =?UTF-8?q?[clean]=20EffectListRepositoryDefinitions?= =?UTF-8?q?=E3=81=AE=E3=82=B3=E3=83=BC=E3=83=89=E3=82=92=E7=B0=A1=E5=8D=98?= =?UTF-8?q?=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EffectListRepositoryDefinitions.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala index ae6cb4331b..c1b39e6b25 100644 --- a/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala +++ b/src/main/scala/com/github/unchama/seichiassist/subsystems/fastdiggingeffect/application/repository/EffectListRepositoryDefinitions.scala @@ -2,6 +2,7 @@ package com.github.unchama.seichiassist.subsystems.fastdiggingeffect.application import cats.effect.concurrent.Deferred import cats.effect.{Concurrent, ConcurrentEffect, Effect, Fiber, Sync, SyncEffect, Timer} +import com.github.unchama.datarepository.definitions.FiberAdjoinedRepositoryDefinition.FiberAdjoined import com.github.unchama.datarepository.template.finalization.RepositoryFinalization import com.github.unchama.datarepository.template.initialization.{PrefetchResult, SinglePhasedRepositoryInitialization} import com.github.unchama.fs2.workaround.fs3.Fs3Topic @@ -22,7 +23,7 @@ object EffectListRepositoryDefinitions { /** * [[FastDiggingEffectList]] と、それを1秒ごとにトピックへ通知するファイバーの組。 */ - type RepositoryValue[F[_], G[_]] = (Mutex[F, G, FastDiggingEffectList], Deferred[F, Fiber[F, Nothing]]) + type RepositoryValue[F[_], G[_]] = Mutex[F, G, FastDiggingEffectList] FiberAdjoined F def initialization[ F[_] : Concurrent, @@ -39,20 +40,21 @@ object EffectListRepositoryDefinitions { F[_] : ConcurrentEffect : Timer : ErrorLogger, G[_] : SyncEffect : ContextCoercion[*[_], F], Player - ](effectTopic: Fs3Topic[F, Option[(Player, FastDiggingEffectList)]]): (Player, RepositoryValue[F, G]) => G[Unit] = - (player, value) => { - val (mutexRef, fiberPromise) = value - + ](effectTopic: Fs3Topic[F, Option[(Player, FastDiggingEffectList)]]): (Player, RepositoryValue[F, G]) => G[Unit] = { + case (player, (mutexRef, fiberPromise)) => val programToRun: F[Unit] = StreamExtra.compileToRestartingStream("[EffectListRepositoryDefinitions]") { fs2.Stream - .awakeEvery[F](1.second) - .evalMap[F, FastDiggingEffectList](_ => ContextCoercion(mutexRef.readLatest)) - .evalTap[F, Unit](effectList => effectTopic.publish1(Some(player, effectList)).void) + .fixedRate[F](1.second) + .evalMap { _ => + ContextCoercion(mutexRef.readLatest).flatMap { latestEffectList => + effectTopic.publish1(Some(player, latestEffectList)) + } + } }.start >>= fiberPromise.complete EffectExtra.runAsyncAndForget[F, G, Unit](programToRun) - } + } def finalization[ F[_] : Effect, From f9e2371f21a4ae588218d2bf37aeb80542e7a06e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 29 Jul 2021 02:04:19 +0000 Subject: [PATCH 7/7] [bump] 12 -> 13 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 9c4b4510a9..a6401c3cba 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ import java.io._ ThisBuild / scalaVersion := "2.13.1" // ThisBuild / version はGitHub Actionsによって自動更新される。 // 次の行は ThisBuild / version := "(\d*)" の形式でなければならない。 -ThisBuild / version := "12" +ThisBuild / version := "13" ThisBuild / organization := "click.seichi" ThisBuild / description := "ギガンティック☆整地鯖の独自要素を司るプラグイン"