This repository has been archived by the owner on Jul 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 167
RSocket Kotlin Coroutines
linux_china edited this page Apr 11, 2020
·
3 revisions
Kotlin已经越来越受到Java程序员的喜爱,其中Coroutines和Flow的支持让异步化操作更加便捷。 Spring社区也提供对Kotlin的支持,Spring WebFlux默认支持Kotlin Coroutines和Flow,你可以在RestController中直接使用。
@RestController
@RequestMapping("/kt")
class CoroutineController {
@Autowired
private lateinit var userKotlinService: UserKotlinService
@GetMapping("/hello")
suspend fun hello(): String {
return "Hello " + userKotlinService.getNickById(1)
}
@GetMapping("/stream1")
fun stream1(): Flow<String> {
return userKotlinService.findNamesByType(1)
}
}
在RSocket Broker项目中,我们也添加了对Kotlin的Coroutines和Flow的支持,你可以直接使用Kotlin Coroutines和Flow来编写RSocket服务。步骤如下:
- 首先创建一个RSocket服务接口,添加对应的suspend关键字或Flow返回类型,如下:
interface UserKotlinService {
//FNF: suspend with Unit return
suspend fun job1()
//Request/Response
suspend fun getNickById(id: Int): String
//Request/Stream
fun findNamesByType(type: Int): Flow<String>
//channel: Flow as param and return type
fun findNamesByIdFlow(idFlow: Flow<Int>): Flow<String>
}
- 实现RSocket服务: Annotation的使用和Java一致,其他就是标准的Kotlin代码。
@Service
@RSocketService(serviceInterface = UserKotlinService::class)
class UserKotlinServiceImpl : UserKotlinService {
override suspend fun job1() {
println("job1")
}
override suspend fun getNickById(id: Int): String {
return "nick: $id"
}
override fun findNamesByType(type: Int): Flow<String> {
return arrayOf("first", "second", "type: $type").asFlow();
}
override fun findNamesByIdFlow(idFlow: Flow<Int>): Flow<String> {
return idFlow.map {
"nick: $it"
}
}
}
应用启动后,Kotlin编写的RSocket服务会自动注册到Broker上,其他Kotlin应用也可以消费该服务啦。
背后的原理不复杂,对于suspend函数来说,主要就是Continuation到Mono之间转换,而Flow则是Flow和Flux之间的转换。 由于涉及到Kotlin的反射操作,所以你需要添加kotlin-reflect依赖。 在RSocket Broker项目中,Kotlin的依赖是Optional的,默认不会添加到你的项目依赖中,所以你需要在项目中自行添加对Kotlin的依赖,样例如下:
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-bom</artifactId>
<version>1.3.71</version>
<scope>import</scope>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-bom</artifactId>
<version>1.3.5</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
- Kotlin Coroutines: https://kotlinlang.org/docs/reference/coroutines-overview.html
- Asynchronous Flow: https://kotlinlang.org/docs/reference/coroutines/flow.html
- Binary: byte stream
- Async message
- Multi transports
- Reactive Semantics
- request/response
- request/stream
- fire-and-forget
- channel
- TCP+TLS
- WebSocket+TLS
- UDP(Aeron)
- RDMA