Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

RSocket Kotlin Coroutines

linux_china edited this page Apr 16, 2020 · 3 revisions

Kotlin已经越来越受到Java程序员的喜爱,其中Coroutines和Flow的支持让异步化操作更加便捷。 Spring社区也提供对Kotlin的支持,Spring WebFlux默认支持Kotlin Coroutines和Flow,你可以在RestController中直接使用。

@RestController
@RequestMapping("/kt")
class CoroutineController {
    @Autowired
    private lateinit var userKotlinService: UserKotlinService

    @GetMapping("/hello")
    suspend fun hello(): String {
        return "Hello " + userKotlinService.getNickById(1)
    }

    @GetMapping("/stream1")
    fun stream1(): Flow<String> {
        return userKotlinService.findNamesByType(1)
    }
}

在RSocket Broker项目中,我们也添加了对Kotlin的Coroutines和Flow的支持,你可以直接使用Kotlin Coroutines和Flow来编写RSocket服务。步骤如下:

  • 首先创建一个RSocket服务接口,添加对应的suspend关键字或Flow返回类型,如下:
interface UserKotlinService {
    //FNF: suspend with Unit return
    suspend fun job1()

    //Request/Response
    suspend fun getNickById(id: Int): String

    //Request/Stream
    fun findNamesByType(type: Int): Flow<String>

    //channel: Flow as param and return type 
    fun findNamesByIdFlow(idFlow: Flow<Int>): Flow<String>
}
  • 实现RSocket服务: Annotation的使用和Java一致,其他就是标准的Kotlin代码。
@Service
@RSocketService(serviceInterface = UserKotlinService::class)
class UserKotlinServiceImpl : UserKotlinService {
    override suspend fun job1() {
        println("job1")
    }

    override suspend fun getNickById(id: Int): String {
        return "nick: $id"
    }

    override fun findNamesByType(type: Int): Flow<String> {
        return arrayOf("first", "second", "type: $type").asFlow();
    }

    override fun findNamesByIdFlow(idFlow: Flow<Int>): Flow<String> {
        return idFlow.map {
            "nick: $it"
        }
    }
}

应用启动后,Kotlin编写的RSocket服务会自动注册到Broker上,其他Kotlin应用也可以消费该服务啦。

背后原理和注意实现

背后的原理不复杂,对于suspend函数来说,主要就是Continuation到Mono之间转换,而Flow则是Flow和Flux之间的转换。 由于涉及到Kotlin的反射操作,所以你需要添加kotlin-reflect依赖。 在RSocket Broker项目中,Kotlin的依赖是Optional的,默认不会添加到你的项目依赖中,所以你需要在项目中自行添加对Kotlin的依赖,样例如下:

     <dependencies>
         <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-stdlib-jdk8</artifactId>
            </dependency>
            <dependency>
                <groupId>org.jetbrains.kotlinx</groupId>
                <artifactId>kotlinx-coroutines-core</artifactId>
            </dependency>
            <dependency>
                <groupId>org.jetbrains.kotlinx</groupId>
                <artifactId>kotlinx-coroutines-jdk8</artifactId>
            </dependency>
            <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-reflect</artifactId>
            </dependency>
            <dependency>
                <groupId>org.jetbrains.kotlinx</groupId>
                <artifactId>kotlinx-coroutines-reactor</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.module</groupId>
                <artifactId>jackson-module-kotlin</artifactId>
            </dependency>
     </dependencies>
     <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-bom</artifactId>
                <version>1.3.71</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
            <dependency>
                <groupId>org.jetbrains.kotlinx</groupId>
                <artifactId>kotlinx-coroutines-bom</artifactId>
                <version>1.3.5</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
        </dependencies>
    </dependencyManagement>

Kotlin serialization

如果你想使用Kotlin serialization进行Json、Protobuf或Cbor的序列化,这个RSocket Broker也是支持的。

  • 创建对应的data class,你需要添加 @Serializable 这个Annotation
@Serializable
data class Data(val a: Int, val b: String = "42")
  • 添加对应的的依赖,如下:
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlinx</groupId>
   <artifactId>kotlinx-serialization-runtime</artifactId>
   <version>0.20.0</version>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlinx</groupId>
   <artifactId>kotlinx-serialization-protobuf</artifactId>
   <version>0.20.0</version>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlinx</groupId>
   <artifactId>kotlinx-serialization-cbor</artifactId>
   <version>0.20.0</version>
</dependency>

更多信息请访问: https://github.com/Kotlin/kotlinx.serialization

References

RSocket

Network Protocol

  • Binary: byte stream
  • Async message
  • Multi transports
  • Reactive Semantics

Symmetric interactions

  • request/response
  • request/stream
  • fire-and-forget
  • channel

Transports

  • TCP+TLS
  • WebSocket+TLS
  • UDP(Aeron)
  • RDMA

Polyglot

Clone this wiki locally