From ca4659a45497a0faf83bc1970c83df3814edf8c5 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Fri, 17 Nov 2023 00:45:05 +0200 Subject: [PATCH] migrate nodejs transport --- .../transport/nodejs/tcp/NodejsTcpAddress.kt | 22 +++++ .../nodejs/tcp/NodejsTcpClientTransport.kt | 64 ++++++++++++ .../nodejs/tcp/NodejsTcpServerTransport.kt | 97 +++++++++++++++++++ .../transport/nodejs/tcp/NodejsTcpSession.kt | 67 +++++++++++++ .../transport/nodejs/tcp/TcpTransportTest.kt | 8 ++ 5 files changed, 258 insertions(+) create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpAddress.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpSession.kt diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpAddress.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpAddress.kt new file mode 100644 index 00000000..6b7ab015 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpAddress.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport.nodejs.tcp + +public class NodejsTcpAddress( + public val hostname: String, + public val port: Int, +) diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt new file mode 100644 index 00000000..eeec98e6 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.rsocket.kotlin.internal.io.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.nodejs.tcp.internal.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +public sealed interface NodejsTcpClientTransport : RSocketClientTransport { + public val address: NodejsTcpAddress + + public companion object Factory : RSocketTransportFactory< + NodejsTcpAddress, + NodejsTcpClientTransport, + NodejsTcpClientTransportBuilder>({ NodejsTcpClientTransportBuilderImpl }) { + + public operator fun invoke( + context: CoroutineContext, + hostname: String, + port: Int, + block: NodejsTcpClientTransportBuilder.() -> Unit = {}, + ): NodejsTcpClientTransport = invoke(context, NodejsTcpAddress(hostname, port), block) + } +} + +public sealed interface NodejsTcpClientTransportBuilder : RSocketTransportBuilder + +private object NodejsTcpClientTransportBuilderImpl : NodejsTcpClientTransportBuilder { + @RSocketTransportApi + override fun buildTransport(context: CoroutineContext, target: NodejsTcpAddress): NodejsTcpClientTransport = + NodejsTcpClientTransportImpl( + coroutineContext = context.supervisorContext(), + address = target, + ) +} + +private class NodejsTcpClientTransportImpl( + override val coroutineContext: CoroutineContext, + override val address: NodejsTcpAddress, +) : NodejsTcpClientTransport { + + @RSocketTransportApi + override suspend fun createSession(): RSocketTransportSession { + ensureActive() + + return NodejsTcpSession(coroutineContext.childContext(), connect(address.port, address.hostname)) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt new file mode 100644 index 00000000..02d0dd6e --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt @@ -0,0 +1,97 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.rsocket.kotlin.internal.io.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.nodejs.tcp.internal.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +public sealed interface NodejsTcpServerInstance : RSocketServerInstance { + public val address: NodejsTcpAddress +} + +public sealed interface NodejsTcpServerTransport : RSocketServerTransport { + public val address: NodejsTcpAddress + + public companion object Factory : RSocketTransportFactory< + NodejsTcpAddress, + NodejsTcpServerTransport, + NodejsTcpServerTransportBuilder>({ NodejsTcpServerTransportBuilderImpl }) { + + public operator fun invoke( + context: CoroutineContext, + hostname: String = "0.0.0.0", + port: Int = 0, + block: NodejsTcpServerTransportBuilder.() -> Unit = {}, + ): NodejsTcpServerTransport = invoke(context, NodejsTcpAddress(hostname, port), block) + } +} + +public sealed interface NodejsTcpServerTransportBuilder : RSocketTransportBuilder + +private object NodejsTcpServerTransportBuilderImpl : NodejsTcpServerTransportBuilder { + @RSocketTransportApi + override fun buildTransport(context: CoroutineContext, target: NodejsTcpAddress): NodejsTcpServerTransport = + NodejsTcpServerTransportImpl( + coroutineContext = context.supervisorContext(), + address = target, + ) +} + +private class NodejsTcpServerTransportImpl( + override val coroutineContext: CoroutineContext, + override val address: NodejsTcpAddress, +) : NodejsTcpServerTransport { + + @RSocketTransportApi + override suspend fun startServer(acceptor: RSocketServerAcceptor): NodejsTcpServerInstance { + ensureActive() + + return NodejsTcpServerInstanceImpl( + coroutineContext = coroutineContext.supervisorContext(), + address = address, + acceptor = acceptor, + ) + } +} + +@RSocketTransportApi +private class NodejsTcpServerInstanceImpl( + override val coroutineContext: CoroutineContext, + override val address: NodejsTcpAddress, + private val acceptor: RSocketServerAcceptor, +) : NodejsTcpServerInstance { + init { + val server = createServer(address.port, address.hostname, { + coroutineContext.job.cancel("Server closed") + }) { + launch { + acceptor.acceptSession(NodejsTcpSession(coroutineContext.childContext(), it)) + } + } + launch { + try { + awaitCancellation() + } catch (cause: Throwable) { + suspendCoroutine { cont -> server.close { cont.resume(Unit) } } + throw cause + } + } + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpSession.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpSession.kt new file mode 100644 index 00000000..37188d93 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpSession.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.* +import io.ktor.utils.io.js.* +import io.rsocket.kotlin.internal.io.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.nodejs.tcp.internal.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.khronos.webgl.* +import kotlin.coroutines.* + +@RSocketTransportApi +internal class NodejsTcpSession( + override val coroutineContext: CoroutineContext, + private val socket: Socket, +) : RSocketTransportSession.Sequential { + + private val sendChannel = channelForCloseable(8) + private val receiveChannel = channelForCloseable(Channel.UNLIMITED) + + init { + launch { + sendChannel.consumeEach { packet -> + socket.write(Uint8Array(packet.withLength().readArrayBuffer())) + } + } + + coroutineContext.job.invokeOnCompletion { + when (it) { + null -> socket.destroy() + else -> socket.destroy(Error(it.message, it.cause)) + } + } + + val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO + socket.on( + onData = { frameAssembler.write { writeFully(it.buffer) } }, + onError = { coroutineContext.job.cancel("Socket error", it) }, + onClose = { if (!it) coroutineContext.job.cancel("Socket closed") } + ) + } + + override suspend fun sendFrame(frame: ByteReadPacket) { + sendChannel.send(frame) + } + + override suspend fun receiveFrame(): ByteReadPacket { + return receiveChannel.receive() + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt index 0addb489..73907a1f 100644 --- a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt +++ b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -34,3 +34,11 @@ class TcpTransportTest : TransportTest() { server.close() } } + +class NodejsTcpTransportTest : TransportTest() { + override suspend fun before() { + val port = PortProvider.next() + val server = startServer(NodejsTcpServerTransport(testContext, port = port)) + client = connectClient(NodejsTcpClientTransport(testContext, server.address)) + } +}