Skip to content

Commit

Permalink
Migrated RpcClient and kRPC to new descriptor API
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr3zee committed Nov 5, 2024
1 parent d089657 commit 7ac04ad
Show file tree
Hide file tree
Showing 18 changed files with 242 additions and 209 deletions.
37 changes: 4 additions & 33 deletions core/src/commonMain/kotlin/kotlinx/rpc/RPCCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,18 @@

package kotlinx.rpc

import kotlin.reflect.KType
import kotlinx.rpc.descriptor.RpcServiceDescriptor

/**
* Represents a method or field call of an RPC service.
* Contains all types and values information for the call, so it can be passed to a server.
*
* @property serviceTypeString The service type as a string.
* @property serviceId The id of a service that is unique within [RPCClient] services
* @property callableName The name of the callable. Can be the name of the method or field.
* @property type The type of call;
* @property data The data for the call.
* It may be a generated class with all parameters and their values or empty class for fields.
* @property dataType The [KType] of the [data].
* @property returnType The [KType] of the return type.
* @property descriptor the descriptor of the service, that made the call.
*/
public data class RPCCall(
val serviceTypeString: String,
val serviceId: Long,
public data class RpcCall(
val descriptor: RpcServiceDescriptor<*>,
val callableName: String,
val type: Type,
val data: Any,
val dataType: KType,
val returnType: KType,
) {
public enum class Type {
Method, Field;
}
}

/**
* Represents a field of the RPC service.
* Can be internally converted to a [RPCCall], but it depends on a specific [RPCClient] implementation.
*
* @property serviceTypeString The service type as a string.
* @property serviceId The id of a service that is unique within [RPCClient] services
* @property name The name of the field.
* @property type The [KType] of the field.
*/
public data class RPCField(
val serviceTypeString: String,
val serviceId: Long,
val name: String,
val type: KType
)
51 changes: 12 additions & 39 deletions core/src/commonMain/kotlin/kotlinx/rpc/RPCClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,35 @@
package kotlinx.rpc

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.Deferred
import kotlin.coroutines.CoroutineContext

/**
* RPCClient represents an abstraction of a RPC client, that can handle requests from several RPC services,
* [RpcClient] represents an abstraction of an RPC client, that can handle requests from several RPC services,
* transform them, send to the server and handle responses and errors.
* [CoroutineScope] defines the lifetime of the client.
*/
public interface RPCClient : CoroutineScope {
public interface RpcClient : CoroutineScope {
/**
* This method is used by generated clients to perform a call to the server.
*
* @param T type of the result
* @param call an object that contains all required information about the called method,
* that is needed to route it properly to the server.
* @return actual result of the call, e.g. data from the server.
* @return actual result of the call, for example, data from the server.
*/
public suspend fun <T> call(call: RPCCall): T
public suspend fun <T> call(call: RpcCall): T

/**
* Registers Flow<T> field of the interface. Sends initialization request, subscribes to emitted values
* and returns the instance of the flow to be consumed
*
* @param T type parameter for Flow
* @param serviceScope Service's coroutine scope
* @param field object that contains information about the field,
* that is used to be mapped to the corresponding field pn server.
* @return Flow instance to be consumed.
*/
public fun <T> registerPlainFlowField(serviceScope: CoroutineScope, field: RPCField): Flow<T>

/**
* Registers SharedFlow<T> field of the interface. Sends initialization request, subscribes to emitted values
* and returns the instance of the flow to be consumed
*
* @param T type parameter for SharedFlow
* @param serviceScope Service's coroutine scope
* @param field object that contains information about the field,
* that is used to be mapped to the corresponding field pn server.
* @return SharedFlow instance to be consumed.
*/
public fun <T> registerSharedFlowField(serviceScope: CoroutineScope, field: RPCField): SharedFlow<T>

/**
* Registers StateFlow<T> field of the interface. Sends initialization request, subscribes to emitted values
* and returns the instance of the flow to be consumed
* This method is used by generated clients to perform a call to the server.
*
* @param T type parameter for StateFlow
* @param serviceScope Service's coroutine scope
* @param field object that contains information about the field,
* that is used to be mapped to the corresponding field pn server.
* @return StateFlow instance to be consumed.
* @param T type of the result
* @param serviceScope service's coroutine scope
* @param call an object that contains all required information about the called method,
* that is needed to route it properly to the server.
* @return actual result of the call, for example, data from the server
*/
public fun <T> registerStateFlowField(serviceScope: CoroutineScope, field: RPCField): StateFlow<T>
public fun <T> callSync(serviceScope: CoroutineScope, call: RpcCall): Deferred<T>

/**
* Provides child [CoroutineContext] for a new [RemoteService] service stub.
Expand Down
2 changes: 1 addition & 1 deletion core/src/commonMain/kotlin/kotlinx/rpc/annotations/Rpc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import kotlinx.rpc.RemoteService
* suspend fun sayHello(firstName: String, lastName: String, age: Int): String
* }
* // client code
* val rpcClient: RPCClient
* val rpcClient: RpcClient
* val myService = rpcClient.withService<MyService>()
* val greetingFromServer = myService.sayHello("Alex", "Smith", 35)
* // server code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package kotlinx.rpc

import kotlinx.rpc.descriptor.serviceDescriptorOf
import kotlinx.rpc.internal.RPCDeferredField
import kotlinx.rpc.internal.RpcDeferredField
import kotlin.reflect.KClass

/**
Expand All @@ -29,9 +29,9 @@ import kotlin.reflect.KClass
public suspend fun <T : RemoteService, R> T.awaitFieldInitialization(getter: T.() -> R): R {
val field = getter()

if (field is RPCDeferredField<*>) {
if (field is RpcDeferredField<*>) {
@Suppress("UNCHECKED_CAST")
return (field as RPCDeferredField<R>).await()
return (field as RpcDeferredField<R>).await()
}

error("Please choose required field for a valid RPC client generated by RPCClient.withService method")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package kotlinx.rpc.descriptor

import kotlinx.rpc.RPCClient
import kotlinx.rpc.RemoteService
import kotlinx.rpc.RpcClient
import kotlinx.rpc.internal.*
import kotlinx.rpc.internal.utils.ExperimentalRPCApi
import kotlin.reflect.KClass
Expand Down Expand Up @@ -42,11 +42,11 @@ public fun <T : RemoteService> serviceDescriptorOf(kClass: KClass<T>): RpcServic
public interface RpcServiceDescriptor<T : RemoteService> {
public val fqName: String

public fun getFields(service: T): List<RPCDeferredField<*>>
public fun getFields(service: T): List<RpcDeferredField<*>>

public fun getCallable(name: String): RpcCallable<T>?

public fun createInstance(serviceId: Long, client: RPCClient): T
public fun createInstance(serviceId: Long, client: RpcClient): T
}

@ExperimentalRPCApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.krpc.client.internal
package kotlinx.rpc.internal

import kotlinx.serialization.Serializable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ package kotlinx.rpc.internal
import kotlinx.rpc.internal.utils.InternalRPCApi

@InternalRPCApi
public interface RPCDeferredField<Self> {
public interface RpcDeferredField<Self> {
public suspend fun await(): Self
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.krpc.client.internal
package kotlinx.rpc.internal

import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.rpc.UninitializedRPCFieldException
import kotlin.reflect.KProperty

internal class RPCFieldProvider<T, R>(
internal class RpcFieldProvider<T, R>(
private val serviceName: String,
private val deferred: CompletableDeferred<T> = CompletableDeferred(),
private val deferred: Deferred<T> = CompletableDeferred(),
val getter: T.() -> R,
) {
@OptIn(ExperimentalCoroutinesApi::class)
Expand All @@ -25,9 +26,9 @@ internal class RPCFieldProvider<T, R>(
}

@Suppress("unused")
internal fun <T> RPCFieldProvider(
internal fun <T> RpcFieldProvider(
serviceName: String,
deferred: CompletableDeferred<T> = CompletableDeferred()
): RPCFieldProvider<T, T> {
return RPCFieldProvider(serviceName, deferred) { this }
): RpcFieldProvider<T, T> {
return RpcFieldProvider(serviceName, deferred) { this }
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,46 @@
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc.krpc.client.internal
package kotlinx.rpc.internal

import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.rpc.internal.RPCDeferredField
import kotlinx.rpc.internal.utils.SupervisedCompletableDeferred

internal sealed class RPCFlow<T, FlowT : Flow<T>>(private val serviceName: String, parent: Job) :
RPCDeferredField<FlowT> {
val deferred: CompletableDeferred<FlowT> = SupervisedCompletableDeferred(parent)

internal sealed class RpcFlow<T, FlowT : Flow<T>>(
private val serviceName: String,
protected val deferred: Deferred<FlowT>,
) : RpcDeferredField<FlowT> {
override suspend fun await(): FlowT {
return deferred.await()
}

internal class Plain<T>(serviceName: String, parent: Job) : RPCFlow<T, Flow<T>>(serviceName, parent),
Flow<T> {
internal class Plain<T>(
serviceName: String,
deferred: Deferred<Flow<T>>,
) : RpcFlow<T, Flow<T>>(serviceName, deferred), Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
deferred.await().collect(collector)
}
}

internal class Shared<T>(serviceName: String, parent: Job) : RPCFlow<T, SharedFlow<T>>(serviceName, parent),
SharedFlow<T> {
internal class Shared<T>(
serviceName: String,
deferred: Deferred<SharedFlow<T>>,
) : RpcFlow<T, SharedFlow<T>>(serviceName, deferred), SharedFlow<T> {
override val replayCache: List<T> by rpcProperty { replayCache }

override suspend fun collect(collector: FlowCollector<T>): Nothing {
deferred.await().collect(collector)
}
}

internal class State<T>(serviceName: String, parent: Job) : RPCFlow<T, StateFlow<T>>(serviceName, parent),
StateFlow<T> {
internal class State<T>(
serviceName: String,
deferred: Deferred<StateFlow<T>>,
) : RpcFlow<T, StateFlow<T>>(serviceName, deferred), StateFlow<T> {
override val value: T by rpcProperty { value }

override val replayCache: List<T> by rpcProperty { replayCache }
Expand All @@ -48,7 +51,7 @@ internal sealed class RPCFlow<T, FlowT : Flow<T>>(private val serviceName: Strin
}
}

protected fun <R> rpcProperty(getter: FlowT.() -> R): RPCFieldProvider<FlowT, R> {
return RPCFieldProvider(serviceName, deferred, getter)
protected fun <R> rpcProperty(getter: FlowT.() -> R): RpcFieldProvider<FlowT, R> {
return RpcFieldProvider(serviceName, deferred, getter)
}
}
86 changes: 86 additions & 0 deletions core/src/commonMain/kotlin/kotlinx/rpc/registerField.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.rpc

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.rpc.descriptor.RpcServiceDescriptor
import kotlinx.rpc.internal.FieldDataObject
import kotlinx.rpc.internal.RpcFlow

/**
* Registers Flow<T> field of the interface. Sends initialization request, subscribes to emitted values
* and returns the instance of the flow to be consumed
*
* @param T type parameter for Flow
* @param serviceScope Service's coroutine scope
* @param fieldName the name of the field.
* @param descriptor descriptor of the service, that made the call
* that is used to be mapped to the corresponding field on a server.
* @param serviceId id of the service, that made the call
* @return Flow instance to be consumed.
*/
public fun <T> RpcClient.registerPlainFlowField(
serviceScope: CoroutineScope,
fieldName: String,
descriptor: RpcServiceDescriptor<*>,
serviceId: Long,
): Flow<T> {
return RpcFlow.Plain(descriptor.fqName, initializeFlowField(serviceScope, fieldName, descriptor, serviceId))
}

/**
* Registers SharedFlow<T> field of the interface. Sends initialization request, subscribes to emitted values
* and returns the instance of the flow to be consumed
*
* @param T type parameter for SharedFlow
* @param serviceScope Service's coroutine scope
* @param fieldName the name of the field.
* @param descriptor descriptor of the service, that made the call
* that is used to be mapped to the corresponding field on a server.
* @param serviceId id of the service, that made the call
* @return SharedFlow instance to be consumed.
*/
public fun <T> RpcClient.registerSharedFlowField(
serviceScope: CoroutineScope,
fieldName: String,
descriptor: RpcServiceDescriptor<*>,
serviceId: Long,
): SharedFlow<T> {
return RpcFlow.Shared(descriptor.fqName, initializeFlowField(serviceScope, fieldName, descriptor, serviceId))
}

/**
* Registers StateFlow<T> field of the interface. Sends initialization request, subscribes to emitted values
* and returns the instance of the flow to be consumed
*
* @param T type parameter for StateFlow
* @param serviceScope Service's coroutine scope
* @param fieldName the name of the field.
* @param descriptor descriptor of the service, that made the call
* that is used to be mapped to the corresponding field on a server.
* @param serviceId id of the service, that made the call
* @return StateFlow instance to be consumed.
*/
public fun <T> RpcClient.registerStateFlowField(
serviceScope: CoroutineScope,
fieldName: String,
descriptor: RpcServiceDescriptor<*>,
serviceId: Long,
): StateFlow<T> {
return RpcFlow.State(descriptor.fqName, initializeFlowField(serviceScope, fieldName, descriptor, serviceId))
}

private fun <T, FlowT : Flow<T>> RpcClient.initializeFlowField(
serviceScope: CoroutineScope,
fieldName: String,
descriptor: RpcServiceDescriptor<*>,
serviceId: Long,
): Deferred<FlowT> {
return callSync(serviceScope, RpcCall(descriptor, fieldName, FieldDataObject, serviceId))
}
Loading

0 comments on commit 7ac04ad

Please sign in to comment.