Skip to content

Commit

Permalink
Merge pull request #1121 from GiganticMinecraft/develop
Browse files Browse the repository at this point in the history
バージョン 13 リリース
  • Loading branch information
kory33 authored Jul 29, 2021
2 parents 86b3943 + bb051c7 commit 4779400
Show file tree
Hide file tree
Showing 23 changed files with 182 additions and 173 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.io._
ThisBuild / scalaVersion := "2.13.1"
// ThisBuild / version はGitHub Actionsによって自動更新される。
// 次の行は ThisBuild / version := "(\d*)" の形式でなければならない。
ThisBuild / version := "12"
ThisBuild / version := "13"
ThisBuild / organization := "click.seichi"
ThisBuild / description := "ギガンティック☆整地鯖の独自要素を司るプラグイン"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.bukkit.event.player.{AsyncPlayerPreLoginEvent, PlayerJoinEvent}
import org.bukkit.event.{EventHandler, EventPriority, Listener}

import java.util.UUID
import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap

case class BukkitRepositoryControls[F[_], R](repository: PlayerDataRepository[R],
Expand All @@ -27,6 +28,11 @@ case class BukkitRepositoryControls[F[_], R](repository: PlayerDataRepository[R]
finalizer.transformContext(trans)
)

import cats.implicits._

def map[S](f: R => S): BukkitRepositoryControls[F, S] =
BukkitRepositoryControls(repository.map(f), initializer, backupProcess, finalizer)

def coerceFinalizationContextTo[G[_] : ContextCoercion[F, *[_]]]: BukkitRepositoryControls[G, R] =
transformFinalizationContext(ContextCoercion.asFunctionK)
}
Expand Down Expand Up @@ -159,7 +165,7 @@ object BukkitRepositoryControls {
import cats.implicits._

definition match {
case RepositoryDefinition.SinglePhased(initialization, tappingAction, finalization) => Sync[F].delay {
case RepositoryDefinition.Phased.SinglePhased(initialization, tappingAction, finalization) => Sync[F].delay {
TrieMap.empty[UUID, R]
}.map { dataMap =>
// workaround of https://youtrack.jetbrains.com/issue/SCL-18638
Expand All @@ -173,7 +179,7 @@ object BukkitRepositoryControls {
)
}

case RepositoryDefinition.TwoPhased(initialization, finalization) => Sync[F].delay {
case RepositoryDefinition.Phased.TwoPhased(initialization, finalization) => Sync[F].delay {
(TrieMap.empty[Player, R], TrieMap.empty[UUID, initialization.IntermediateData])
}.map { case (dataMap, temporaryDataMap) =>
// workaround of https://youtrack.jetbrains.com/issue/SCL-18638
Expand All @@ -186,6 +192,9 @@ object BukkitRepositoryControls {
Finalizers.twoPhased(finalization)(temporaryDataMap, dataMap)
)
}

case rd: RepositoryDefinition.Mapped[F, Player, s, R] =>
createHandles[F, s](rd.source).map(_.map(rd.sr))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object FiberAdjoinedRepositoryDefinition {
G[_] : Sync,
F[_] : ConcurrentEffect,
Player, R
](definition: RepositoryDefinition[G, Player, R]): definition.Self[R FiberAdjoined F] =
](definition: RepositoryDefinition.Phased[G, Player, R]): definition.Self[R FiberAdjoined F] =
definition
.flatXmapWithIntermediateEffects(r =>
Deferred
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object MutexRepositoryDefinition {

def over[
F[_] : Concurrent, G[_] : Sync : ContextCoercion[*[_], F], Player, R
](underlying: RepositoryDefinition[G, Player, R]): underlying.Self[Mutex[F, G, R]] =
](underlying: RepositoryDefinition.Phased[G, Player, R]): underlying.Self[Mutex[F, G, R]] =
underlying.flatXmap(r => Mutex.of[F, G, R](r))(_.readLatest)

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object RefDictBackedRepositoryDefinition {

def usingUuidRefDictWithEffectfulDefault[
F[_] : Monad, Player, R
](refDict: RefDict[F, UUID, R])(getDefaultValue: F[R]): RepositoryDefinition.SinglePhased[F, Player, R] = {
](refDict: RefDict[F, UUID, R])(getDefaultValue: F[R]): RepositoryDefinition.Phased.SinglePhased[F, Player, R] = {
val initialization: SinglePhasedRepositoryInitialization[F, R] =
(uuid, _) => refDict
.read(uuid)
Expand All @@ -27,12 +27,12 @@ object RefDictBackedRepositoryDefinition {
val finalization: RepositoryFinalization[F, UUID, R] =
RepositoryFinalization.withoutAnyFinalization((uuid, r) => refDict.write(uuid, r))

RepositoryDefinition.SinglePhased.withoutTappingAction(initialization, finalization)
RepositoryDefinition.Phased.SinglePhased.withoutTappingAction(initialization, finalization)
}


def usingUuidRefDict[F[_] : Monad, Player, R](refDict: RefDict[F, UUID, R])
(defaultValue: R): RepositoryDefinition.SinglePhased[F, Player, R] =
(defaultValue: R): RepositoryDefinition.Phased.SinglePhased[F, Player, R] =
usingUuidRefDictWithEffectfulDefault(refDict)(Monad[F].pure(defaultValue))

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object SessionMutexRepositoryDefinition {
G[_] : Sync : ContextCoercion[*[_], F],
Player
]: RepositoryDefinition[G, Player, SessionMutex[F, G]] = {
RepositoryDefinition.SinglePhased.withoutTappingAction(
RepositoryDefinition.Phased.SinglePhased.withoutTappingAction(
SinglePhasedRepositoryInitialization.withSupplier(SessionMutex.newIn[F, G]),
RepositoryFinalization.withoutAnyPersistence[G, UUID, SessionMutex[F, G]] { (_, mutex) =>
EffectExtra.runAsyncAndForget[F, G, Unit] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.github.unchama.datarepository.definitions

import cats.effect.concurrent.Ref
import cats.effect.{ConcurrentEffect, Sync}
import cats.effect.{Concurrent, ConcurrentEffect, Sync}
import com.github.unchama.datarepository.template.RepositoryDefinition
import com.github.unchama.datarepository.template.RepositoryDefinition.Phased
import com.github.unchama.generic.ContextCoercion
import com.github.unchama.generic.effect.EffectExtra
import com.github.unchama.generic.effect.concurrent.AsymmetricSignallingRef
Expand All @@ -13,34 +14,42 @@ import io.chrisdavenport.log4cats.ErrorLogger

object SignallingRepositoryDefinition {

import FiberAdjoinedRepositoryDefinition.FiberAdjoined
import cats.implicits._

def forPlayerTopic[
def withPublishSink[
G[_] : Sync,
F[_] : ConcurrentEffect : ContextCoercion[G, *[_]] : ErrorLogger,
Player: HasUuid, T
](publishSink: Pipe[F, (Player, T), Unit])
(definition: RepositoryDefinition[G, Player, T]): RepositoryDefinition[G, Player, Ref[G, T]] = {
definition.toTwoPhased.flatXmapWithPlayer { player =>
initialValue =>
(definition: RepositoryDefinition.Phased[G, Player, T]): Phased.TwoPhased[G, Player, Ref[G, T] FiberAdjoined F] = {
FiberAdjoinedRepositoryDefinition.extending(definition.toTwoPhased).flatXmapWithPlayer {
player => { case (initialValue, fiberPromise) =>
AsymmetricSignallingRef[G, F, T](initialValue)
.flatTap { ref =>
EffectExtra.runAsyncAndForget[F, G, Unit] {
ref
.valuesAwait
.use { stream =>
// FIXME: This *never* returns. It is likely that this is not garbage collected.
// We might need to find a way to
// - restart the stream when the downstream stream fails
// - unsubscribe when the player exits
// We should be able to achieve this by returning a CancelToken or something on this flatXmapWithPlayer
StreamExtra.compileToRestartingStream("[SignallingRepositoryDefinition]") {
stream.map(player -> _).through(publishSink)
Concurrent[F].start[Nothing] {
ref
.valuesAwait
.use[F, Nothing] { stream =>
StreamExtra.compileToRestartingStream[F, Nothing]("[SignallingRepositoryDefinition]") {
stream.map(player -> _).through(publishSink)
}
}
}
} >>= fiberPromise.complete
}
}
.widen[Ref[G, T]]
} { ref => ref.get }
.map(_ -> fiberPromise)
}
} { case (ref, fiberPromise) => ref.get.map(_ -> fiberPromise) }
}

def withPublishSinkHidden[
G[_] : Sync,
F[_] : ConcurrentEffect : ContextCoercion[G, *[_]] : ErrorLogger,
Player: HasUuid, T
](publishSink: Pipe[F, (Player, T), Unit])
(definition: RepositoryDefinition.Phased[G, Player, T]): RepositoryDefinition[G, Player, Ref[G, T]] =
withPublishSink(publishSink)(definition).map(_._1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,106 +10,117 @@ import com.github.unchama.minecraft.algebra.HasUuid
import java.util.UUID

sealed trait RepositoryDefinition[F[_], Player, R] {
final def map[S](f: R => S): RepositoryDefinition[F, Player, S] = RepositoryDefinition.Mapped(this, f)
}

type Self[S] <: RepositoryDefinition[F, Player, S]
object RepositoryDefinition {

def toTwoPhased(implicit F: Monad[F], playerHasUuid: HasUuid[Player]): RepositoryDefinition.TwoPhased[F, Player, R] =
this match {
case s@RepositoryDefinition.SinglePhased(_, _, _) => s.augmentToTwoPhased((_, r) => F.pure(r))(F.pure[R])
case t@RepositoryDefinition.TwoPhased(_, _) => t
}
import cats.implicits._

def flatXmapWithIntermediateEffects[S](f: R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): Self[S]
/**
* データ生成処理と終了処理についての操作権を持つ [[RepositoryDefinition]]。
*/
sealed trait Phased[F[_], Player, R] extends RepositoryDefinition[F, Player, R] {
type Self[S] <: RepositoryDefinition[F, Player, S]

def flatXmap[S](f: R => F[S])(g: S => F[R])(implicit F: Monad[F]): Self[S] =
flatXmapWithIntermediateEffects(f)(g)(g)
def flatXmapWithIntermediateEffects[S](f: R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): Self[S]

def xmap[S](f: R => S)(g: S => R)(implicit F: Monad[F]): Self[S] =
flatXmap(r => F.pure(f(r)))(s => F.pure(g(s)))
final def toTwoPhased(implicit F: Monad[F], playerHasUuid: HasUuid[Player]): Phased.TwoPhased[F, Player, R] =
this match {
case s@Phased.SinglePhased(_, _, _) => s.augmentToTwoPhased((_, r) => F.pure(r))(F.pure[R])
case t@Phased.TwoPhased(_, _) => t
}

def toRefRepository(implicit F: Sync[F]): Self[Ref[F, R]] =
flatXmap(r => Ref.of[F, R](r))(ref => ref.get)
final def flatXmap[S](f: R => F[S])(g: S => F[R])(implicit F: Monad[F]): Self[S] =
flatXmapWithIntermediateEffects(f)(g)(g)

def augmentF[S](f: R => F[S])(implicit F: Monad[F]): Self[(R, S)] =
flatXmap(r => F.map(f(r))(s => (r, s)))(rs => F.pure(rs._1))
}
final def xmap[S](f: R => S)(g: S => R)(implicit F: Monad[F]): Self[S] =
flatXmap(r => F.pure(f(r)))(s => F.pure(g(s)))

object RepositoryDefinition {
final def toRefRepository(implicit F: Sync[F]): Self[Ref[F, R]] =
flatXmap(r => Ref.of[F, R](r))(ref => ref.get)

import cats.implicits._
final def augmentF[S](f: R => F[S])(implicit F: Monad[F]): Self[(R, S)] =
flatXmap(r => F.map(f(r))(s => (r, s)))(rs => F.pure(rs._1))
}

case class SinglePhased[F[_], Player, R](initialization: SinglePhasedRepositoryInitialization[F, R],
tappingAction: (Player, R) => F[Unit],
finalization: RepositoryFinalization[F, UUID, R])
extends RepositoryDefinition[F, Player, R] {
object Phased {

override type Self[S] = SinglePhased[F, Player, S]
case class SinglePhased[F[_], Player, R](initialization: SinglePhasedRepositoryInitialization[F, R],
tappingAction: (Player, R) => F[Unit],
finalization: RepositoryFinalization[F, UUID, R])
extends Phased[F, Player, R] {

override def flatXmapWithIntermediateEffects[S](f: R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): SinglePhased[F, Player, S] =
RepositoryDefinition.SinglePhased(
(uuid, name) => initialization.prepareData(uuid, name).flatMap(_.traverse(f)),
(player, s) => beforePersisting(s).flatMap(tappingAction(player, _)),
finalization.withIntermediateEffects(beforePersisting)(beforeFinalization)
)
override type Self[S] = SinglePhased[F, Player, S]

override def flatXmapWithIntermediateEffects[S](f: R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): SinglePhased[F, Player, S] =
SinglePhased(
(uuid, name) => initialization.prepareData(uuid, name).flatMap(_.traverse(f)),
(player, s) => beforePersisting(s).flatMap(tappingAction(player, _)),
finalization.withIntermediateEffects(beforePersisting)(beforeFinalization)
)

def withAnotherTappingAction[U](another: (Player, R) => F[U])
(implicit F: Apply[F]): SinglePhased[F, Player, R] = this.copy(
tappingAction = (player, r) => F.productR(tappingAction(player, r))(another(player, r).as(()))
)

def augmentToTwoPhased[T](prepareFinalData: (Player, R) => F[T])(revertOnFinalization: T => F[R])
(implicit F: Monad[F], playerHasUuid: HasUuid[Player]): TwoPhased[F, Player, T] =
TwoPhased(
TwoPhasedRepositoryInitialization.augment(initialization)((player, r) =>
tappingAction(player, r) >> prepareFinalData(player, r)
),
finalization
.withIntermediateEffect(revertOnFinalization)
.contraMapKey(playerHasUuid.asFunction)
def withAnotherTappingAction[U](another: (Player, R) => F[U])
(implicit F: Apply[F]): SinglePhased[F, Player, R] = this.copy(
tappingAction = (player, r) => F.productR(tappingAction(player, r))(another(player, r).as(()))
)
}

object SinglePhased {
def withoutTappingAction[
F[_] : Applicative, Player, R
](initialization: SinglePhasedRepositoryInitialization[F, R],
finalization: RepositoryFinalization[F, UUID, R]): SinglePhased[F, Player, R] = {
SinglePhased(initialization, (_, _) => Applicative[F].unit, finalization)
def augmentToTwoPhased[T](prepareFinalData: (Player, R) => F[T])(revertOnFinalization: T => F[R])
(implicit F: Monad[F], playerHasUuid: HasUuid[Player]): TwoPhased[F, Player, T] =
TwoPhased(
TwoPhasedRepositoryInitialization.augment(initialization)((player, r) =>
tappingAction(player, r) >> prepareFinalData(player, r)
),
finalization
.withIntermediateEffect(revertOnFinalization)
.contraMapKey(playerHasUuid.asFunction)
)
}

def trivial[F[_] : Applicative, Player]: SinglePhased[F, Player, Unit] = withoutTappingAction(
SinglePhasedRepositoryInitialization.constant(()),
RepositoryFinalization.trivial
)

def withSupplierAndTrivialFinalization[F[_] : Monad, Player, R](supplier: F[R]): SinglePhased[F, Player, R] =
trivial[F, Player].flatXmap(_ => supplier)(_ => Applicative[F].unit)
}

case class TwoPhased[F[_], Player, R](initialization: TwoPhasedRepositoryInitialization[F, Player, R],
finalization: RepositoryFinalization[F, Player, R])
extends RepositoryDefinition[F, Player, R] {
override type Self[S] = TwoPhased[F, Player, S]

def flatXmapWithPlayerAndIntermediateEffects[S](f: Player => R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): TwoPhased[F, Player, S] =
RepositoryDefinition.TwoPhased(
initialization.extendPreparation(f),
finalization.withIntermediateEffects(beforePersisting)(beforeFinalization)
object SinglePhased {
def withoutTappingAction[
F[_] : Applicative, Player, R
](initialization: SinglePhasedRepositoryInitialization[F, R],
finalization: RepositoryFinalization[F, UUID, R]): SinglePhased[F, Player, R] = {
SinglePhased(initialization, (_, _) => Applicative[F].unit, finalization)
}

def trivial[F[_] : Applicative, Player]: SinglePhased[F, Player, Unit] = withoutTappingAction(
SinglePhasedRepositoryInitialization.constant(()),
RepositoryFinalization.trivial
)

override def flatXmapWithIntermediateEffects[S](f: R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): TwoPhased[F, Player, S] =
flatXmapWithPlayerAndIntermediateEffects(_ => f)(beforePersisting)(beforeFinalization)
def withSupplierAndTrivialFinalization[F[_] : Monad, Player, R](supplier: F[R]): SinglePhased[F, Player, R] =
trivial[F, Player].flatXmap(_ => supplier)(_ => Applicative[F].unit)
}

def flatXmapWithPlayer[S](f: Player => R => F[S])(g: S => F[R])(implicit F: Monad[F]): TwoPhased[F, Player, S] =
flatXmapWithPlayerAndIntermediateEffects(f)(g)(g)
case class TwoPhased[F[_], Player, R](initialization: TwoPhasedRepositoryInitialization[F, Player, R],
finalization: RepositoryFinalization[F, Player, R])
extends Phased[F, Player, R] {
override type Self[S] = TwoPhased[F, Player, S]

def flatXmapWithPlayerAndIntermediateEffects[S](f: Player => R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): TwoPhased[F, Player, S] =
TwoPhased(
initialization.extendPreparation(f),
finalization.withIntermediateEffects(beforePersisting)(beforeFinalization)
)

override def flatXmapWithIntermediateEffects[S](f: R => F[S])
(beforePersisting: S => F[R])(beforeFinalization: S => F[R])
(implicit F: Monad[F]): TwoPhased[F, Player, S] =
flatXmapWithPlayerAndIntermediateEffects(_ => f)(beforePersisting)(beforeFinalization)

def flatXmapWithPlayer[S](f: Player => R => F[S])(g: S => F[R])(implicit F: Monad[F]): TwoPhased[F, Player, S] =
flatXmapWithPlayerAndIntermediateEffects(f)(g)(g)
}
}

case class Mapped[F[_], Player, S, R](source: RepositoryDefinition[F, Player, S], sr: S => R)
extends RepositoryDefinition[F, Player, R]
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ object Fs3Channel {
// The latest implementation of fs2.concurrent.Channel would poll on the producer.get
// to allow cancellation of the send action. This is achieved by the MonadCancel typeclass of
// cats-effect 3, which is not available on cats-effect 2.
// However, on SeichiAssist, we hardly cancel any concurrent action,
// hence we are not allowing any cancellation to happen.
// We are temporarily not allowing any cancellation to happen here, but this can be improved.
notifyStream(waiting) <* producer.get
)
}.flatten
Expand Down
Loading

0 comments on commit 4779400

Please sign in to comment.