協程
Kotlin 協程是 Kotlin 的輕量級執行緒,允許以命令式方式編寫非阻塞程式碼。在語言方面,暫停函數為非同步操作提供了抽象,而在函式庫方面,kotlinx.coroutines 提供了諸如 async { }
之類的函數和諸如 Flow
之類的型別。
Spring Framework 在以下範圍內提供對協程的支援
-
Spring MVC 和 WebFlux 註解
@Controller
中支援暫停函數 -
WebFlux.fn coRouter { } DSL
-
WebFlux
CoWebFilter
-
RSocket
@MessageMapping
註解方法中支援暫停函數和Flow
-
RSocketRequester
的擴充功能 -
Spring AOP
相依性
當類路徑中存在 kotlinx-coroutines-core
和 kotlinx-coroutines-reactor
相依性時,會啟用協程支援
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
支援 1.4.0
及以上版本。
Reactive 如何轉換為協程?
對於傳回值,從 Reactive 到協程 API 的轉換如下
-
fun handler(): Mono<Void>
變成suspend fun handler()
-
fun handler(): Mono<T>
變成suspend fun handler(): T
或suspend fun handler(): T?
,取決於Mono
是否可以為空 (優點是更靜態型別化) -
fun handler(): Flux<T>
變成fun handler(): Flow<T>
對於輸入參數
-
如果不需要延遲載入,
fun handler(mono: Mono<T>)
變成fun handler(value: T)
,因為可以調用暫停函數來取得值參數。 -
如果需要延遲載入,
fun handler(mono: Mono<T>)
變成fun handler(supplier: suspend () → T)
或fun handler(supplier: suspend () → T?)
Flow
在協程世界中相當於 Flux
,適用於熱或冷串流、有限或無限串流,主要差異如下
-
Flow
是基於推送的,而Flux
是推拉混合 -
背壓透過暫停函數實作
-
Flow
只有一個 單個暫停collect
方法,而運算子則實作為 擴充功能 -
由於協程,運算子很容易實作
-
擴充功能允許將自訂運算子新增到
Flow
-
Collect 操作是暫停函數
-
map
運算子 支援非同步操作 (不需要flatMap
),因為它採用暫停函數參數
閱讀這篇部落格文章 Going Reactive with Spring, Coroutines and Kotlin Flow 以取得更多詳細資訊,包括如何使用協程並行執行程式碼。
控制器
以下是協程 @RestController
的範例。
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
也支援使用 @Controller
進行檢視渲染。
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn
以下是透過 coRouter { } DSL 和相關處理器定義的協程路由器範例。
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
交易
協程上的交易透過 Reactive 交易管理的程式化變體支援。
對於暫停函數,提供了 TransactionalOperator.executeAndAwait
擴充功能。
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
對於 Kotlin Flow
,提供了 Flow<T>.transactional
擴充功能。
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}