Skip to content

Commit

Permalink
wip io
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed May 25, 2024
1 parent ca5cce8 commit 12812a1
Show file tree
Hide file tree
Showing 64 changed files with 402 additions and 461 deletions.
6 changes: 4 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[versions]
kotlin = "2.0.0"

kotlinx-io = "0.3.5"
kotlinx-atomicfu = "0.24.0"
kotlinx-coroutines = "1.8.1"
kotlinx-benchmark = "0.4.8"
Expand All @@ -23,14 +24,15 @@ jmh = "1.36"
maven-publish = "0.28.0"

[libraries]
kotlinx-io-core = { module = "org.jetbrains.kotlinx:kotlinx-io-core", version.ref = "kotlinx-io" }

kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" }
kotlinx-coroutines-reactor = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-reactor", version.ref = "kotlinx-coroutines" }

kotlinx-benchmark = { module = "org.jetbrains.kotlinx:kotlinx-benchmark-runtime", version.ref = "kotlinx-benchmark" }

ktor-io = { module = "io.ktor:ktor-io", version.ref = "ktor" }
ktor-utils = { module = "io.ktor:ktor-utils", version.ref = "ktor" }

ktor-network = { module = "io.ktor:ktor-network", version.ref = "ktor" }
ktor-websockets = { module = "io.ktor:ktor-websockets", version.ref = "ktor" }
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
Expand Down
2 changes: 1 addition & 1 deletion rsocket-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ kotlin {
implementation(projects.rsocketInternalIo)

api(libs.kotlinx.coroutines.core)
api(libs.ktor.io)
api(libs.kotlinx.io.core)
}
commonTest.dependencies {
implementation(projects.rsocketTest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlinx.io.*

@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of new Transport API")
public interface Connection : CoroutineScope {
public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
public suspend fun send(packet: Source)
public suspend fun receive(): Source
}
12 changes: 6 additions & 6 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,14 @@

package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.io.*

public interface RSocket : CoroutineScope {

public suspend fun metadataPush(metadata: ByteReadPacket) {
public suspend fun metadataPush(metadata: Source) {
metadata.close()
notImplemented("Metadata Push")
}
Expand Down Expand Up @@ -52,10 +52,10 @@ public interface RSocket : CoroutineScope {
private fun notImplemented(operation: String): Nothing = throw NotImplementedError("$operation is not implemented.")

/**
* Tries to emit [value], if emit failed, f.e. due cancellation, calls [Closeable.close] on [value].
* Better to use it instead of [FlowCollector.emit] with [Payload] or [ByteReadPacket] to avoid leaks of dropped elements.
* Tries to emit [value], if emit failed, f.e. due cancellation, calls [AutoCloseable.close] on [value].
* Better to use it instead of [FlowCollector.emit] with [Payload] or [Source] to avoid leaks of dropped elements.
*/
public suspend fun <C : Closeable> FlowCollector<C>.emitOrClose(value: C) {
public suspend fun <C : AutoCloseable> FlowCollector<C>.emitOrClose(value: C) {
try {
return emit(value)
} catch (e: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,21 +16,21 @@

package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.io.*
import kotlin.coroutines.*

public class RSocketRequestHandlerBuilder internal constructor() {
private var metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null
private var metadataPush: (suspend RSocket.(metadata: Source) -> Unit)? = null
private var fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null
private var requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null
private var requestStream: (suspend RSocket.(payload: Payload) -> Flow<Payload>)? = null
private var requestChannel: (suspend RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload>)? =
null

public fun metadataPush(block: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)) {
public fun metadataPush(block: (suspend RSocket.(metadata: Source) -> Unit)) {
check(metadataPush == null) { "Metadata Push handler already configured" }
metadataPush = block
}
Expand Down Expand Up @@ -78,13 +78,13 @@ public fun RSocketRequestHandler(

private class RSocketRequestHandler(
override val coroutineContext: CoroutineContext,
private val metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null,
private val metadataPush: (suspend RSocket.(metadata: Source) -> Unit)? = null,
private val fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null,
private val requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null,
private val requestStream: (suspend RSocket.(payload: Payload) -> Flow<Payload>)? = null,
private val requestChannel: (suspend RSocket.(initPayload: Payload, payloads: Flow<Payload>) -> Flow<Payload>)? = null,
) : RSocket {
override suspend fun metadataPush(metadata: ByteReadPacket): Unit =
override suspend fun metadataPush(metadata: Source): Unit =
metadataPush?.invoke(this, metadata) ?: super.metadataPush(metadata)

override suspend fun fireAndForget(payload: Payload): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
Expand All @@ -26,6 +25,7 @@ import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.io.*
import kotlin.coroutines.*

// TODO: rename to just `Connection` after root `Connection` will be dropped
Expand All @@ -34,7 +34,7 @@ internal abstract class Connection2(
protected val frameCodec: FrameCodec,
// requestContext
final override val coroutineContext: CoroutineContext,
) : RSocket, Closeable {
) : RSocket, AutoCloseable {

// connection establishment part

Expand All @@ -45,30 +45,30 @@ internal abstract class Connection2(

// connection part

protected abstract suspend fun sendConnectionFrame(frame: ByteReadPacket)
protected abstract suspend fun sendConnectionFrame(frame: Source)
private suspend fun sendConnectionFrame(frame: Frame): Unit = sendConnectionFrame(frameCodec.encodeFrame(frame))

suspend fun sendError(cause: Throwable) {
sendConnectionFrame(ErrorFrame(0, cause))
}

private suspend fun sendMetadataPush(metadata: ByteReadPacket) {
private suspend fun sendMetadataPush(metadata: Source) {
sendConnectionFrame(MetadataPushFrame(metadata))
}

suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
suspend fun sendKeepAlive(respond: Boolean, data: Source, lastPosition: Long) {
sendConnectionFrame(KeepAliveFrame(respond, lastPosition, data))
}

// operations part

protected abstract fun launchRequest(requestPayload: Payload, operation: RequesterOperation): Job
private suspend fun ensureActiveOrClose(closeable: Closeable) {
private suspend fun ensureActiveOrClose(closeable: AutoCloseable) {
currentCoroutineContext().ensureActive { closeable.close() }
coroutineContext.ensureActive { closeable.close() }
}

final override suspend fun metadataPush(metadata: ByteReadPacket) {
final override suspend fun metadataPush(metadata: Source) {
ensureActiveOrClose(metadata)
sendMetadataPush(metadata)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import kotlinx.io.*

// send/receive setup, resume, resume ok, lease, error
@RSocketTransportApi
internal abstract class ConnectionEstablishmentContext(
private val frameCodec: FrameCodec,
) {
protected abstract suspend fun receiveFrameRaw(): ByteReadPacket?
protected abstract suspend fun sendFrame(frame: ByteReadPacket)
protected abstract suspend fun receiveFrameRaw(): Source?
protected abstract suspend fun sendFrame(frame: Source)
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))

// only setup|lease|resume|resume_ok|error frames
Expand All @@ -42,7 +42,7 @@ internal abstract class ConnectionEstablishmentContext(
version: Version,
honorLease: Boolean,
keepAlive: KeepAlive,
resumeToken: ByteReadPacket?,
resumeToken: Source?,
payloadMimeType: PayloadMimeType,
payload: Payload,
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.operation.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.io.*
import kotlin.coroutines.*

@RSocketTransportApi
Expand All @@ -41,19 +41,19 @@ internal class ConnectionInbound(
else -> frame.close()
}

private fun receiveMetadataPush(metadata: ByteReadPacket) {
private fun receiveMetadataPush(metadata: Source) {
launch {
responder.metadataPush(metadata)
}.invokeOnCompletion { metadata.close() }
}

@Suppress("UNUSED_PARAMETER") // will be used later
private fun receiveKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
private fun receiveKeepAlive(respond: Boolean, data: Source, lastPosition: Long) {
keepAliveHandler.receive(data, respond)
}

@Suppress("UNUSED_PARAMETER") // will be used later
private fun receiveLease(ttl: Int, numberOfRequests: Int, metadata: ByteReadPacket?) {
private fun receiveLease(ttl: Int, numberOfRequests: Int, metadata: Source?) {
metadata?.close()
error("Lease is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,22 @@

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.io.*

@RSocketLoggingApi
@RSocketTransportApi
internal fun RSocketConnectionHandler.logging(logger: Logger, bufferPool: BufferPool): RSocketConnectionHandler {
internal fun RSocketConnectionHandler.logging(logger: Logger): RSocketConnectionHandler {
if (!logger.isLoggable(LoggingLevel.DEBUG)) return this

return RSocketConnectionHandler {
handleConnection(
when (it) {
is RSocketSequentialConnection -> SequentialLoggingConnection(it, logger, bufferPool)
is RSocketMultiplexedConnection -> MultiplexedLoggingConnection(it, logger, bufferPool)
is RSocketSequentialConnection -> SequentialLoggingConnection(it, logger)
is RSocketMultiplexedConnection -> MultiplexedLoggingConnection(it, logger)
}
)
}
Expand All @@ -43,42 +42,31 @@ internal fun RSocketConnectionHandler.logging(logger: Logger, bufferPool: Buffer
private class SequentialLoggingConnection(
private val delegate: RSocketSequentialConnection,
private val logger: Logger,
private val bufferPool: BufferPool,
) : RSocketSequentialConnection {
override val isClosedForSend: Boolean get() = delegate.isClosedForSend

override suspend fun sendFrame(streamId: Int, frame: ByteReadPacket) {
logger.debug { "Send: ${dumpFrameToString(frame, bufferPool)}" }
delegate.sendFrame(streamId, frame)
override suspend fun sendFrame(streamId: Int, frame: Source) {
delegate.sendFrame(streamId, logger.dumping(frame, "Send: "))
}

override suspend fun receiveFrame(): ByteReadPacket? {
return delegate.receiveFrame()?.also { frame ->
logger.debug { "Receive: ${dumpFrameToString(frame, bufferPool)}" }
}
override suspend fun receiveFrame(): Source? {
return logger.dumping(delegate.receiveFrame() ?: return null, "Receive:")
}

}

private fun dumpFrameToString(frame: ByteReadPacket, bufferPool: BufferPool): String {
val length = frame.remaining
return frame.copy().use { it.readFrame(bufferPool).use { it.dump(length) } }
}

@RSocketLoggingApi
@RSocketTransportApi
private class MultiplexedLoggingConnection(
private val delegate: RSocketMultiplexedConnection,
private val logger: Logger,
private val bufferPool: BufferPool,
) : RSocketMultiplexedConnection {
override suspend fun createStream(): RSocketMultiplexedConnection.Stream {
return MultiplexedLoggingStream(delegate.createStream(), logger, bufferPool)
return MultiplexedLoggingStream(delegate.createStream(), logger)
}

override suspend fun acceptStream(): RSocketMultiplexedConnection.Stream? {
return delegate.acceptStream()?.let {
MultiplexedLoggingStream(it, logger, bufferPool)
MultiplexedLoggingStream(it, logger)
}
}
}
Expand All @@ -88,26 +76,34 @@ private class MultiplexedLoggingConnection(
private class MultiplexedLoggingStream(
private val delegate: RSocketMultiplexedConnection.Stream,
private val logger: Logger,
private val bufferPool: BufferPool,
) : RSocketMultiplexedConnection.Stream {
override val isClosedForSend: Boolean get() = delegate.isClosedForSend

override fun setSendPriority(priority: Int) {
delegate.setSendPriority(priority)
}

override suspend fun sendFrame(frame: ByteReadPacket) {
logger.debug { "Send: ${dumpFrameToString(frame, bufferPool)}" }
delegate.sendFrame(frame)
override suspend fun sendFrame(frame: Source) {
delegate.sendFrame(logger.dumping(frame, "Send: "))
}

override suspend fun receiveFrame(): ByteReadPacket? {
return delegate.receiveFrame()?.also { frame ->
logger.debug { "Receive: ${dumpFrameToString(frame, bufferPool)}" }
}
override suspend fun receiveFrame(): Source? {
return logger.dumping(delegate.receiveFrame() ?: return null, "Receive:")
}

override fun close() {
delegate.close()
}
}

@RSocketLoggingApi
private fun Logger.dumping(frame: Source, tag: String): Source {
val buffer = Buffer()
frame.transferTo(buffer)
debug {
val length = buffer.size
val dump = buffer.copy().readFrame().dump(length)
"$tag $dump"
}
return buffer
}
Loading

0 comments on commit 12812a1

Please sign in to comment.