Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Duration as the ground truth for communicating durations #4256

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/topics/cancellation-and-timeouts.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ It produces the following output:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1.3s
```

<!--- TEST STARTS_WITH -->
Expand Down
11 changes: 6 additions & 5 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -297,20 +297,21 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls {
}

public abstract interface class kotlinx/coroutines/Delay {
public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
public abstract fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V
public abstract fun timeoutMessage-LRDsOJo (J)Ljava/lang/String;
}

public final class kotlinx/coroutines/Delay$DefaultImpls {
public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public static fun invokeOnTimeout-KLykuaI (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public static fun timeoutMessage-LRDsOJo (Lkotlinx/coroutines/Delay;J)Ljava/lang/String;
}

public final class kotlinx/coroutines/DelayKt {
public static final fun awaitCancellation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delay-VtjQ1oo (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toDelayMillis-LRDsOJo (J)J
}

public abstract interface annotation class kotlinx/coroutines/DelicateCoroutinesApi : java/lang/annotation/Annotation {
Expand Down
7 changes: 4 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ abstract interface kotlinx.coroutines/CoroutineScope { // kotlinx.coroutines/Cor
}

abstract interface kotlinx.coroutines/Delay { // kotlinx.coroutines/Delay|null[0]
abstract fun scheduleResumeAfterDelay(kotlin/Long, kotlinx.coroutines/CancellableContinuation<kotlin/Unit>) // kotlinx.coroutines/Delay.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.Long;kotlinx.coroutines.CancellableContinuation<kotlin.Unit>){}[0]
open fun invokeOnTimeout(kotlin/Long, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines/Delay.invokeOnTimeout|invokeOnTimeout(kotlin.Long;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0]
open suspend fun delay(kotlin/Long) // kotlinx.coroutines/Delay.delay|delay(kotlin.Long){}[0]
abstract fun scheduleResumeAfterDelay(kotlin.time/Duration, kotlinx.coroutines/CancellableContinuation<kotlin/Unit>) // kotlinx.coroutines/Delay.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.time.Duration;kotlinx.coroutines.CancellableContinuation<kotlin.Unit>){}[0]
open fun invokeOnTimeout(kotlin.time/Duration, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines/Delay.invokeOnTimeout|invokeOnTimeout(kotlin.time.Duration;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0]
open fun timeoutMessage(kotlin.time/Duration): kotlin/String // kotlinx.coroutines/Delay.timeoutMessage|timeoutMessage(kotlin.time.Duration){}[0]
}

abstract interface kotlinx.coroutines/Job : kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines/Job|null[0]
Expand Down Expand Up @@ -758,6 +758,7 @@ final fun (kotlin.coroutines/CoroutineContext).kotlinx.coroutines/ensureActive()
final fun (kotlin.coroutines/CoroutineContext).kotlinx.coroutines/newCoroutineContext(kotlin.coroutines/CoroutineContext): kotlin.coroutines/CoroutineContext // kotlinx.coroutines/newCoroutineContext|[email protected](kotlin.coroutines.CoroutineContext){}[0]
final fun (kotlin.ranges/IntRange).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<kotlin/Int> // kotlinx.coroutines.flow/asFlow|[email protected](){}[0]
final fun (kotlin.ranges/LongRange).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<kotlin/Long> // kotlinx.coroutines.flow/asFlow|[email protected](){}[0]
final fun (kotlin.time/Duration).kotlinx.coroutines/toDelayMillis(): kotlin/Long // kotlinx.coroutines/toDelayMillis|[email protected](){}[0]
final fun (kotlin/IntArray).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<kotlin/Int> // kotlinx.coroutines.flow/asFlow|[email protected](){}[0]
final fun (kotlin/LongArray).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<kotlin/Long> // kotlinx.coroutines.flow/asFlow|[email protected](){}[0]
final fun (kotlinx.coroutines.channels/ReceiveChannel<*>).kotlinx.coroutines.channels/cancelConsumed(kotlin/Throwable?) // kotlinx.coroutines.channels/cancelConsumed|[email protected]<*>(kotlin.Throwable?){}[0]
Expand Down
57 changes: 21 additions & 36 deletions kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.time.*
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.Duration.Companion.milliseconds

/**
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
Expand All @@ -16,19 +17,8 @@ import kotlin.time.Duration.Companion.nanoseconds
*/
@InternalCoroutinesApi
public interface Delay {

/** @suppress **/
@Deprecated(
message = "Deprecated without replacement as an internal method never intended for public use",
level = DeprecationLevel.ERROR
) // Error since 1.6.0
public suspend fun delay(time: Long) {
if (time <= 0) return // don't delay
return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
}

/**
* Schedules resume of a specified [continuation] after a specified delay [timeMillis].
* Schedules resume of a specified [continuation] after a specified delay [time].
*
* Continuation **must be scheduled** to resume even if it is already cancelled, because a cancellation is just
* an exception that the coroutine that used `delay` might wanted to catch and process. It might
Expand All @@ -42,28 +32,20 @@ public interface Delay {
* with(continuation) { resumeUndispatchedWith(Unit) }
* ```
*/
public fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>)
public fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation<Unit>)

/**
* Schedules invocation of a specified [block] after a specified delay [timeMillis].
* Schedules invocation of a specified [block] after a specified delay [timeout].
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation
* request if it is not needed anymore.
*/
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}
public fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeout, block, context)

/**
* Enhanced [Delay] interface that provides additional diagnostics for [withTimeout].
* Is going to be removed once there is proper JVM-default support.
* Then we'll be able put this function into [Delay] without breaking binary compatibility.
*/
@InternalCoroutinesApi
internal interface DelayWithTimeoutDiagnostics : Delay {
/**
* Returns a string that explains that the timeout has occurred, and explains what can be done about it.
*/
fun timeoutMessage(timeout: Duration): String
fun timeoutMessage(timeout: Duration): String = "Timed out waiting for $timeout"
}

/**
Expand Down Expand Up @@ -103,8 +85,8 @@ internal interface DelayWithTimeoutDiagnostics : Delay {
public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}

/**
* Delays coroutine for at least the given time without blocking a thread and resumes it after a specified time.
* If the given [timeMillis] is non-positive, this function returns immediately.
* Delays coroutine for at least the given [duration] without blocking a thread and resumes it after the specified time.
* If the given [duration] is non-positive, this function returns immediately.
*
* This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
* suspending function is waiting, this function immediately resumes with [CancellationException].
Expand All @@ -116,21 +98,20 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context.
* @param timeMillis time in milliseconds.
*/
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
public suspend fun delay(duration: Duration) {
if (duration <= Duration.ZERO) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
// instead of actually waiting for an infinite time, just wait forever like awaitCancellation, don't schedule.
if (duration.isFinite()) {
cont.context.delay.scheduleResumeAfterDelay(duration, cont)
}
}
}

/**
* Delays coroutine for at least the given [duration] without blocking a thread and resumes it after the specified time.
* If the given [duration] is non-positive, this function returns immediately.
* Delays coroutine for at least the given time without blocking a thread and resumes it after a specified time.
* If the given [timeMillis] is non-positive, this function returns immediately.
*
* This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this
* suspending function is waiting, this function immediately resumes with [CancellationException].
Expand All @@ -142,8 +123,11 @@ public suspend fun delay(timeMillis: Long) {
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context.
* @param timeMillis time in milliseconds.
*/
public suspend fun delay(duration: Duration): Unit = delay(duration.toDelayMillis())
public suspend fun delay(timeMillis: Long) {
delay(timeMillis.milliseconds)
}

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
Expand All @@ -152,6 +136,7 @@ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor)
* Convert this duration to its millisecond value. Durations which have a nanosecond component less than
* a single millisecond will be rounded up to the next largest millisecond.
*/
@PublishedApi
internal fun Duration.toDelayMillis(): Long = when (isPositive()) {
true -> plus(999_999L.nanoseconds).inWholeMilliseconds
false -> 0L
Expand Down
21 changes: 5 additions & 16 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.internal.*
import kotlin.concurrent.Volatile
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.time.Duration

/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
Expand Down Expand Up @@ -144,24 +145,12 @@ private const val SCHEDULE_OK = 0
private const val SCHEDULE_COMPLETED = 1
private const val SCHEDULE_DISPOSED = 2

private const val MS_TO_NS = 1_000_000L
private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS

/**
* First-line overflow protection -- limit maximal delay.
* Delays longer than this one (~146 years) are considered to be delayed "forever".
*/
private const val MAX_DELAY_NS = Long.MAX_VALUE / 2

internal fun delayToNanos(timeMillis: Long): Long = when {
timeMillis <= 0 -> 0L
timeMillis >= MAX_MS -> Long.MAX_VALUE
else -> timeMillis * MS_TO_NS
}

internal fun delayNanosToMillis(timeNanos: Long): Long =
timeNanos / MS_TO_NS

private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")

private typealias Queue<T> = LockFreeTaskQueueCore<T>
Expand Down Expand Up @@ -224,8 +213,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
rescheduleAllDelayed()
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation<Unit>) {
val timeNanos = time.inWholeNanoseconds
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
Expand All @@ -240,8 +229,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
}
}

protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val timeNanos = delayToNanos(timeMillis)
protected fun scheduleInvokeOnTimeout(timeout: Duration, block: Runnable): DisposableHandle {
val timeNanos = timeout.inWholeNanoseconds
return if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedRunnableTask(now + timeNanos, block).also { task ->
Expand Down
Loading