0%

Vert.x和Kotlin Coroutine 异步编程漫谈

上一节讲到了异步编程的一种实现方式,就是不断的去往epoll里添加read/write事件监听的handler,并在事件发生后调用对应的handler。在实际工程中,Kotlin Coroutine是另外一种实现方式。

一、如何实现

协程的核心优势是减少线程数量,从而减少线程切换带来的overhead,而配合上异步编程,碰撞出了别样的火花。
coroutine overview
如上图,当两个Client分别连接到服务的时候,首先是被Acceptor线程接收并建立socket连接,然后通过eventloop线程来处理连接任务,这里会有两个Coroutine分别是C1和C2,注意到二者在执行到一半的时候,都会遇到阻塞的逻辑,从而被suspend,然后添加到另外一个eventloop中,并在之后读事件ready的时候被resume,协程的好处在这里的体现是,在阻塞的时候(C1的suspend和resume中间这段)线程可以被C2使用到,运行C2的逻辑,从而讲服务的QPS提高了,如果是阻塞的方式,C2则必须要在C1被执行完之后(C1 resume)才能执行。这里同时需要注意asyncHttpClient也使用的是高性能的epoll方式,如果是其他blocking的方式,整体的性能会下降。
从单个线程角度上看,其实协程是被包装成了一个一个Task,并放到线程的taskQueue中被依次执行的
coroutine schedule

二、实现分析

代码使用第四部分性能测试中的代码,我们先使用下面的反编译命令将HttpVerticle的class文件进行反编译

1
~/software/jd-cli-1.2.0-dist/jd-cli /Users/wudan03/software/netty-tutorial/target/classes/org/wudan/vertx/HttpVerticle\$start\$2\$1.class

结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@DebugMetadata(f = "HttpVerticle.kt", l = {31, 32}, i = {}, s = {}, n = {}, m = "invokeSuspend", c = "org.wudan.vertx.HttpVerticle$start$2$1")
@Metadata(mv = {1, 6, 0}, k = 3, xi = 48, d1 = {"\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002HŠ@"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;"})
final class null extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;

null(RoutingContext $cont, Continuation $completion) {
super(2, $completion);
}

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
Intrinsics.checkNotNullExpressionValue(Future.fromCompletionStage(HttpVerticle.this.getHealthCheckFuture2()), "fromCompletionStage(getHealthCheckFuture2())");
this.label = 1;
if (VertxCoroutineKt.await(Future.fromCompletionStage(HttpVerticle.this.getHealthCheckFuture2()), (Continuation)this) == object)
return object;
(Response)VertxCoroutineKt.await(Future.fromCompletionStage(HttpVerticle.this.getHealthCheckFuture2()), (Continuation)this);
Intrinsics.checkNotNullExpressionValue(this.$cont.response().end("Ok!"), "cont.response().end(\"Ok!\")");
this.label = 2;
if (VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this) == object)
return object;
VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
(Response)SYNTHETIC_LOCAL_VARIABLE_1;
Intrinsics.checkNotNullExpressionValue(this.$cont.response().end("Ok!"), "cont.response().end(\"Ok!\")");
this.label = 2;
if (VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this) == object)
return object;
VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this);
return Unit.INSTANCE;
case 2:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super null> $completion) {
return (Continuation<Unit>)new Object(HttpVerticle.this, this.$cont, $completion);
}

@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
return ((null)create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
}

0. 协程启动

通过 launch 命令启动协程,协程启动之后会被加入到Dispatcher中被调度,可以在DispatchedContinuation.kt中line222打断点验证,整个调度过程其实就是将其加入到线程的执行队列中,执行的就是上面反编译后的代码

1. 协程挂起分析

首次执行的时候,label是0,进入case0逻辑,正常情况下之后的await会失败,导致协程返回 IntrinsicsKt.getCOROUTINE_SUSPENDED(),返回的状态会被下面的resumeWith方法拿到,并进行判断(outcome的判断那部分),判断为 COROUTINE_SUSPENDED 之后函数直接返回了,也就是表示这个coroutine的执行结束了,这也就是基本的挂起过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 调用上面反编译代码的 invokeSuspend 函数
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}

2. 协程恢复分析

由于我们在代码中设置了await,并且await被转换成了Vertx的Future,在http response ready的时候,通过Future的set操作触发从而resume协程(通过调用cont.resume),

1
2
3
4
5
6
7
8
9
10
suspend fun <T> Future<T>.await(): T = when {
succeeded() -> result()
failed() -> throw cause()
else -> suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
onComplete { asyncResult ->
if (asyncResult.succeeded()) cont.resume(asyncResult.result() as T)
else cont.resumeWithException(asyncResult.cause())
}
}
}

resume里面做的重要的工作是调用dispatcher.dispatch把协程包装成任务重新加到线程的队列中去执行。执行的过程,上述反编译的代码被重新执行,由于协程的代码被编译后会加入一些label,在重新执行的时候会根据label去执行resume之后的代码快,并不会重复执行之前的代码块。

三、性能测试

使用下面的代码,在本地8080端口启动一个HTTP 网关,将请求代理到同一网络下机器的9090端口(代码中选用了acyncHttpClient是因为它是基于netty的,之前选用retrofit+OkHttp是线程池阻塞的,效果特别差)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class HttpVerticle : CoroutineVerticle() {

private val acyncClient : AsyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
// I/O线程限制不要太大
.setIoThreadsCount(8)
.setProxyServer(Dsl.proxyServer("192.168.1.4", 9090)))

fun getHealthCheckFuture2() : CompletableFuture<Response?>? {
return acyncClient.prepareGet("http://192.168.1.4:9090/healthcheck")
.execute()
.toCompletableFuture()
}

override suspend fun start() {
var router = Router.router(vertx)
router.route().handler { cont ->
cont.request().pause()
launch {
val response = Future.fromCompletionStage(getHealthCheckFuture2()).await()
cont.response().end("Ok!").await()
}
}
vertx.createHttpServer().requestHandler(router).listen(8080).await()
println("Server Starting!")
}
}

suspend fun main() {
var vertxOption = VertxOptions()
vertxOption.maxEventLoopExecuteTime = TimeUnit.MILLISECONDS.toNanos(100)
vertxOption.eventLoopPoolSize = 1 // 单线程运行

var options = DeploymentOptions()
options.instances = 1 // 单Verticle运行
var deployVerticle = Vertx.vertx(vertxOption).deployVerticle("org.wudan.vertx.HttpVerticle", options).await()
}

9090端口启动的是一个golang的简单HTTP Server(main.go),之所以选择go是因为它可以轻松写出并发很高的HTTP Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"log"
"time"
"net/http"
)

func helloHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Fprintf(w, "Hello World!")
}

func healthcheckHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Fprintf(w, "Ok!")
}

func main() {
fileServer := http.FileServer(http.Dir("./static"))
http.Handle("/", fileServer)
http.HandleFunc("/hello", helloHandler)
http.HandleFunc("/healthcheck", healthcheckHandler)

fmt.Printf("Starting server at port 9090\n")
if err := http.ListenAndServe(":9090", nil); err != nil {
log.Fatal(err)
}
}

这里为了明确效果,我们将这个网关和Nginx做了对比,使用下面的命令进行压测并对比结果(自己的电脑,QPS上不去,最多只能100 Concurrency)

1
ab -n 10000 -c 100 -k "http://127.0.0.1:8080/healthcheck"

分别压测网关和Nginx,两者都是单线程/进程,同时Nginx开启了deamon。下面是网关的结果,基本可以看到100个线程,平均每个线程1秒可以完成7.4个请求(每个请求需要100多毫秒才能返回),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Server Software:
Server Hostname: 192.168.1.3
Server Port: 8080

Document Path: /healthcheck
Document Length: 3 bytes

Concurrency Level: 100
Time taken for tests: 13.406 seconds
Complete requests: 10000
Failed requests: 0
Keep-Alive requests: 10000
Total transferred: 650000 bytes
HTML transferred: 30000 bytes
Requests per second: 745.94 [#/sec] (mean)
Time per request: 134.058 [ms] (mean)
Time per request: 1.341 [ms] (mean, across all concurrent requests)
Transfer rate: 47.35 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 2.4 0 28
Processing: 106 132 13.0 129 199
Waiting: 106 132 13.0 129 199
Total: 106 132 13.7 129 199

Percentage of the requests served within a certain time (ms)
50% 129
66% 134
75% 139
80% 142
90% 150
95% 156
98% 167
99% 186
100% 199 (longest request)

下面是Nginx的结果,Nginx在最后一波请求的时候特别慢(具体原因待排查),导致性能会差一些

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Server Software:        nginx/1.8.1
Server Hostname: 192.168.1.3
Server Port: 8080

Document Path: /healthcheck
Document Length: 3 bytes

Concurrency Level: 100
Time taken for tests: 45.134 seconds
Complete requests: 10000
Failed requests: 17
(Connect: 0, Receive: 0, Length: 17, Exceptions: 0)
Non-2xx responses: 17
Keep-Alive requests: 9960
Total transferred: 1649167 bytes
HTML transferred: 39078 bytes
Requests per second: 221.56 [#/sec] (mean)
Time per request: 451.336 [ms] (mean)
Time per request: 4.513 [ms] (mean, across all concurrent requests)
Transfer rate: 35.68 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 3.4 0 39
Processing: 116 421 2375.9 177 26593
Waiting: 116 421 2375.9 177 26593
Total: 116 421 2375.9 177 26593

Percentage of the requests served within a certain time (ms)
50% 177
66% 187
75% 194
80% 198
90% 210
95% 223
98% 246
99% 13434
100% 26593 (longest request)

其实理论上要做并发更大的测试,但是考虑到下周会对网关做10KQPS的压测,这里先不弄,因为目前的结果已经符合预期了。
总的来说,我们的Vert.x+Kotlin Coroutine的HTTP网关的性能比Nginx要好,这个可能与它用了多个I/O线程有关,同时协程占用内存的问题,可能还需要进一步评估。

四、总结

  • 协程并没有像线程那样的时间片调度的方式,而是事件触发方式的,因此需要确保每个协程在执行过程一定不要有耗时操作,否则会影响其他协程的执行
  • 经过suspend和resume这两波操作,Vert.x竟然实现了和上一节说到的基于事件的差不过的效果,可见编程模型也o是挺重要的
  • 协程还有很多值得探讨的地方,例如异常处理、取消处理、超时处理等等。

五、Reference

  1. Kotlin:深度理解协程挂起恢复实现原理
  2. Kotlin Coroutine原理
  3. Kotlin Coroutine
如果您觉得这些内容对您有帮助,你可以赞助我以提高站点的文章质量