協程

Kotlin 協程是 Kotlin 的輕量級執行緒,允許以命令式方式編寫非阻塞程式碼。在語言方面,暫停函數為非同步操作提供了抽象,而在函式庫方面,kotlinx.coroutines 提供了諸如 async { } 之類的函數和諸如 Flow 之類的型別。

Spring Framework 在以下範圍內提供對協程的支援

  • Spring MVC 和 WebFlux 註解 @Controller 中支援 DeferredFlow 傳回值

  • Spring MVC 和 WebFlux 註解 @Controller 中支援暫停函數

  • WebFlux clientserver 函數式 API 的擴充功能。

  • WebFlux.fn coRouter { } DSL

  • WebFlux CoWebFilter

  • RSocket @MessageMapping 註解方法中支援暫停函數和 Flow

  • RSocketRequester 的擴充功能

  • Spring AOP

相依性

當類路徑中存在 kotlinx-coroutines-corekotlinx-coroutines-reactor 相依性時,會啟用協程支援

build.gradle.kts

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

支援 1.4.0 及以上版本。

Reactive 如何轉換為協程?

對於傳回值,從 Reactive 到協程 API 的轉換如下

  • fun handler(): Mono<Void> 變成 suspend fun handler()

  • fun handler(): Mono<T> 變成 suspend fun handler(): Tsuspend fun handler(): T?,取決於 Mono 是否可以為空 (優點是更靜態型別化)

  • fun handler(): Flux<T> 變成 fun handler(): Flow<T>

對於輸入參數

  • 如果不需要延遲載入,fun handler(mono: Mono<T>) 變成 fun handler(value: T),因為可以調用暫停函數來取得值參數。

  • 如果需要延遲載入,fun handler(mono: Mono<T>) 變成 fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)

Flow 在協程世界中相當於 Flux,適用於熱或冷串流、有限或無限串流,主要差異如下

  • Flow 是基於推送的,而 Flux 是推拉混合

  • 背壓透過暫停函數實作

  • Flow 只有一個 單個暫停 collect 方法,而運算子則實作為 擴充功能

  • 由於協程,運算子很容易實作

  • 擴充功能允許將自訂運算子新增到 Flow

  • Collect 操作是暫停函數

  • map 運算子 支援非同步操作 (不需要 flatMap),因為它採用暫停函數參數

閱讀這篇部落格文章 Going Reactive with Spring, Coroutines and Kotlin Flow 以取得更多詳細資訊,包括如何使用協程並行執行程式碼。

控制器

以下是協程 @RestController 的範例。

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

也支援使用 @Controller 進行檢視渲染。

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn

以下是透過 coRouter { } DSL 和相關處理器定義的協程路由器範例。

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

交易

協程上的交易透過 Reactive 交易管理的程式化變體支援。

對於暫停函數,提供了 TransactionalOperator.executeAndAwait 擴充功能。

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

    suspend fun initDatabase() = operator.executeAndAwait {
        insertPerson1()
        insertPerson2()
    }

    private suspend fun insertPerson1() {
        // INSERT SQL statement
    }

    private suspend fun insertPerson2() {
        // INSERT SQL statement
    }
}

對於 Kotlin Flow,提供了 Flow<T>.transactional 擴充功能。

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

    fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

    private fun findPeople(): Flow<Person> {
        // SELECT SQL statement
    }

    private suspend fun updatePerson(person: Person): Person {
        // UPDATE SQL statement
    }
}