0%

简介

最近工作中涉及到Envoy相关的内容,主要功能是做最大流量的CircuitBreak,这里简单介绍下使用方式,给出一个例子

安装和启动

先下载Envoy

1
2
git clone https://github.com/envoyproxy/envoy.git
cd envoy/examples/front-proxy

添加下面的配置到service-envoy.yaml配置文件中

1
2
3
4
5
circuit_breakers:
thresholds:
max_connections: 1
max_pending_requests: 1
max_requests: 1

启动Envoy以及两个服务

1
2
docker-compose pull
docker-compose up --build -d

验证Envoy启动正常

1
2
curl -v localhost:8080/service/1
curl -v localhost:8080/service/2

验证CircuitBreak

首先安装wrk

1
2
3
git clone https://github.com/wg/wrk.git
cd wrk
make

然后执行压测

1
./wrk -t10 -c4 -d120s http://127.0.0.1:8080/service/1

在另外一个Terminal中执行tcpdump查看http return code(主要是wrk不提供返回码的统计)

1
sudo tcpdump -i lo0 -A -vvv port 8080 | grep "service/1"

通过结果可以看到返回码为503,同时,还增加了一个header:

1
x-envoy-overloaded: true

表示Envoy在触发circuit break后会返回503

Reference

  1. Envoy Front End Proxy
  2. envoy-proxy-demo
  3. wrk

简介

本次晋升虽然通过了,但是整个过程有一些经验和反思值得记录,这些东西是通用的。

前期准备

因为同时在准备面试,整个准备的时间实际是比较少的,总共时间不超过5天,主要还是之前做的一些工作记录了文档,这也体现了平时记文档是非常有必要的。
前期预答辩不超过5次,在答辩前一天早上我还在练习,整个过程其实是极限操作了,连续两个晚上凌晨三四点睡,主要的原因还是之前在准备面试,没时间搞答辩用的材料。
因为做的事情很多都还没到收益的时间,只讲了notification的重构和双AZ部署,以及网关项目,整体的问题是收益点较少(或者说我没整理出来收益点,这一点如果是强哥他是可以整理出来的),但是技术点还是非常多的,也是听了强哥的建议吧技术点做的突出了一些。

答辩

我答辩之前几十分钟还在练习ppt,最终导致我在答辩的时候非常流利,声音也比较洪亮,整体会给评委一种对项目很熟悉、工作思考的很深的感觉。
而且在答辩过程,对于一些细节点,我也是对答如流,这些东西只有当时解决这个问题的时候是自己充分想过才能答的好,说明平时的积累还是挺重要的。
这里面可能也可能因为评委我都认识,其中三位是平时交流比较多的,一位是之前交流过的,另外一位是没怎么交流的。所以自己也比较放得开。

优点(评委点评)

优点评委说了很多,我觉得都比较客观,总结来说分以下几点

  • 业务、技术能力强,技术细节很清楚,有一定技术视野,对整个业务的认知比较全面

整体评价还是比较中肯,没有夸大和贬低的嫌疑。

建议(评委点评)

惊奇的发现相比其他答辩的同学,我在劣势那一栏竟然是没有文字的,别的同学在劣势那一栏是有具体的文字的,而我只在建议那一栏有文字。

  • 技术细节关注过多,宏观层面、系统层面看问题还需要加强,以及如何通过技术获取收益比较缺乏

这个评价整体来说也是中肯,因为三个项目的收益却是较少,但是技术方面的东西还是比较多的,但是我觉得技术也算是收益,因为我们平时就是和代码打交道,代码更好了,平时的效率也会更高。(引用自强哥)有时候给公司带来收益的项目,可能很简单,谁做了都可以拿到收益(例如春节活动),有的事因事成人,我其实想体现的事因人成事。

总结

  • 平时应该多总结,把一些工作记录下来,这样找工作或者晋升的时候能直接用,而不需要费力去回忆。
  • 熟练度问题;要充分练习,根据职级的要求,预测评委可能问出的问题,并提前准备问题答案,多练习,我这次答辩算是非常流利了,因为我对整个事情的熟悉程度非常高,而且最近在面试,练的我口才也贼好。
  • 结果导向;把背景、工作、成果这些主要的点写清楚了,同时要扼要,把你影响最大的部分表现出来,这是最重要的,而不是说要把项目写的多厉害,我们的目标是在15分钟内把项目关键点讲清楚,让评委知道你做了对公司有用的事情,值得给你晋升。并非每个人做的项目都是非常全面完美的,我们需要突出自己项目的优点出来,这样才可以和别的人进行比较。

感谢安杰和强哥提供指导,其实拿到强哥之前的答辩PPT就已经很有收获了,强哥还给我指导了一些地方的写法,他说的那些点我基本都改了。
整个答辩过程让我体验了一把拼尽全力,做成一件事情的感觉。感觉我之前做事情还不是太尽力,这次积累了经验之后,对以后做事情还是非常有帮助的。

简介

这篇文章和技术无关,主要是想总结下最近在人员管理或者说协作方面的一些感悟,最近发现总结是非常有用的,一方面是提炼一下最近积累的知识、想法,另一方面可以记录下来以后要是忘了还可以再次回顾,所以这里简单总结下。

目标

每个人都有自己的职业发展目标,但是工作安排,或者说你的老板会有一个对你工作期望的目标。这两个目标可能不重叠,存在gap,比较好的做法是,两边都互相靠近一下,个人的职业技术发展,多往老板对你的工作期望靠,同时,老板在安排工作时,能够尽量考虑到员工的个人职业发展,当然还是以前者为主,老板的主要目标还是完成既定的任务。提供给员工机会达成他的职业发展目标只是顺带做的事情。

放手

尽量放手让下面的同事去做,去设计具体的方案以及编写代码,但是前提是我们要对他的方案、代码把关,具体就是方案评审和代码review,在这些关键点控制好质量,一方面是可以体现你对对方的信任,另一方面我们通过关键点的控制也保证了质量,可谓一举两得。

问题感知

一定要感知到下面的同事遇到的问题,这是管理过程非常重要的点,因为问题会阻碍他的工作进展速度,同时打击工作积极性,我们需要将其不能解决的问题,要么通过我们更广阔的技术视野、更强的技术能力提供一个解决办法,要么通过我们丰富的项目经验,提供一个替代方案解决问题,做tradeoff。

达成共识

总的来说,技术需要遵循一套标准,例如代码规范,我们需要遵循这些标准,当下面的同事没有遵循的时候,需要告诉他这个问题,但是可能有的标准对方不认,我们无法强行让对方遵循这些规范。一种方式是我们控制设计方案或者代码的准入,如果达不到规范就不能落地方案或者提交合并代码,另一种方式是在公司的规范里引入,例如如果代码如果没有遵循规范,被sonar检测出来了,就不能合并之类的。这种达成共识还是比较主观的,分人,有的人会遵循相同的规范,有的新手可能不会,这种东西就是不遵守不会出什么故障,但是遵守的话效率会更高,无法强制施加给对方。

分歧

通常管理者和被管理者在一个问题上都会存在一定的分歧,并不一定要解决分歧,可以允许分歧存在,但是要服务于最终的目标。

Reference

上一节讲到了异步编程的一种实现方式,就是不断的去往epoll里添加read/write事件监听的handler,并在事件发生后调用对应的handler。在实际工程中,Kotlin Coroutine是另外一种实现方式。

一、如何实现

协程的核心优势是减少线程数量,从而减少线程切换带来的overhead,而配合上异步编程,碰撞出了别样的火花。
coroutine overview
如上图,当两个Client分别连接到服务的时候,首先是被Acceptor线程接收并建立socket连接,然后通过eventloop线程来处理连接任务,这里会有两个Coroutine分别是C1和C2,注意到二者在执行到一半的时候,都会遇到阻塞的逻辑,从而被suspend,然后添加到另外一个eventloop中,并在之后读事件ready的时候被resume,协程的好处在这里的体现是,在阻塞的时候(C1的suspend和resume中间这段)线程可以被C2使用到,运行C2的逻辑,从而讲服务的QPS提高了,如果是阻塞的方式,C2则必须要在C1被执行完之后(C1 resume)才能执行。这里同时需要注意asyncHttpClient也使用的是高性能的epoll方式,如果是其他blocking的方式,整体的性能会下降。
从单个线程角度上看,其实协程是被包装成了一个一个Task,并放到线程的taskQueue中被依次执行的
coroutine schedule

二、实现分析

代码使用第四部分性能测试中的代码,我们先使用下面的反编译命令将HttpVerticle的class文件进行反编译

1
~/software/jd-cli-1.2.0-dist/jd-cli /Users/wudan03/software/netty-tutorial/target/classes/org/wudan/vertx/HttpVerticle\$start\$2\$1.class

结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@DebugMetadata(f = "HttpVerticle.kt", l = {31, 32}, i = {}, s = {}, n = {}, m = "invokeSuspend", c = "org.wudan.vertx.HttpVerticle$start$2$1")
@Metadata(mv = {1, 6, 0}, k = 3, xi = 48, d1 = {"\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002HŠ@"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;"})
final class null extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;

null(RoutingContext $cont, Continuation $completion) {
super(2, $completion);
}

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
Intrinsics.checkNotNullExpressionValue(Future.fromCompletionStage(HttpVerticle.this.getHealthCheckFuture2()), "fromCompletionStage(getHealthCheckFuture2())");
this.label = 1;
if (VertxCoroutineKt.await(Future.fromCompletionStage(HttpVerticle.this.getHealthCheckFuture2()), (Continuation)this) == object)
return object;
(Response)VertxCoroutineKt.await(Future.fromCompletionStage(HttpVerticle.this.getHealthCheckFuture2()), (Continuation)this);
Intrinsics.checkNotNullExpressionValue(this.$cont.response().end("Ok!"), "cont.response().end(\"Ok!\")");
this.label = 2;
if (VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this) == object)
return object;
VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
(Response)SYNTHETIC_LOCAL_VARIABLE_1;
Intrinsics.checkNotNullExpressionValue(this.$cont.response().end("Ok!"), "cont.response().end(\"Ok!\")");
this.label = 2;
if (VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this) == object)
return object;
VertxCoroutineKt.await(this.$cont.response().end("Ok!"), (Continuation)this);
return Unit.INSTANCE;
case 2:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super null> $completion) {
return (Continuation<Unit>)new Object(HttpVerticle.this, this.$cont, $completion);
}

@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
return ((null)create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
}

0. 协程启动

通过 launch 命令启动协程,协程启动之后会被加入到Dispatcher中被调度,可以在DispatchedContinuation.kt中line222打断点验证,整个调度过程其实就是将其加入到线程的执行队列中,执行的就是上面反编译后的代码

1. 协程挂起分析

首次执行的时候,label是0,进入case0逻辑,正常情况下之后的await会失败,导致协程返回 IntrinsicsKt.getCOROUTINE_SUSPENDED(),返回的状态会被下面的resumeWith方法拿到,并进行判断(outcome的判断那部分),判断为 COROUTINE_SUSPENDED 之后函数直接返回了,也就是表示这个coroutine的执行结束了,这也就是基本的挂起过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 调用上面反编译代码的 invokeSuspend 函数
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}

2. 协程恢复分析

由于我们在代码中设置了await,并且await被转换成了Vertx的Future,在http response ready的时候,通过Future的set操作触发从而resume协程(通过调用cont.resume),

1
2
3
4
5
6
7
8
9
10
suspend fun <T> Future<T>.await(): T = when {
succeeded() -> result()
failed() -> throw cause()
else -> suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
onComplete { asyncResult ->
if (asyncResult.succeeded()) cont.resume(asyncResult.result() as T)
else cont.resumeWithException(asyncResult.cause())
}
}
}

resume里面做的重要的工作是调用dispatcher.dispatch把协程包装成任务重新加到线程的队列中去执行。执行的过程,上述反编译的代码被重新执行,由于协程的代码被编译后会加入一些label,在重新执行的时候会根据label去执行resume之后的代码快,并不会重复执行之前的代码块。

三、性能测试

使用下面的代码,在本地8080端口启动一个HTTP 网关,将请求代理到同一网络下机器的9090端口(代码中选用了acyncHttpClient是因为它是基于netty的,之前选用retrofit+OkHttp是线程池阻塞的,效果特别差)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class HttpVerticle : CoroutineVerticle() {

private val acyncClient : AsyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
// I/O线程限制不要太大
.setIoThreadsCount(8)
.setProxyServer(Dsl.proxyServer("192.168.1.4", 9090)))

fun getHealthCheckFuture2() : CompletableFuture<Response?>? {
return acyncClient.prepareGet("http://192.168.1.4:9090/healthcheck")
.execute()
.toCompletableFuture()
}

override suspend fun start() {
var router = Router.router(vertx)
router.route().handler { cont ->
cont.request().pause()
launch {
val response = Future.fromCompletionStage(getHealthCheckFuture2()).await()
cont.response().end("Ok!").await()
}
}
vertx.createHttpServer().requestHandler(router).listen(8080).await()
println("Server Starting!")
}
}

suspend fun main() {
var vertxOption = VertxOptions()
vertxOption.maxEventLoopExecuteTime = TimeUnit.MILLISECONDS.toNanos(100)
vertxOption.eventLoopPoolSize = 1 // 单线程运行

var options = DeploymentOptions()
options.instances = 1 // 单Verticle运行
var deployVerticle = Vertx.vertx(vertxOption).deployVerticle("org.wudan.vertx.HttpVerticle", options).await()
}

9090端口启动的是一个golang的简单HTTP Server(main.go),之所以选择go是因为它可以轻松写出并发很高的HTTP Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"log"
"time"
"net/http"
)

func helloHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Fprintf(w, "Hello World!")
}

func healthcheckHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Fprintf(w, "Ok!")
}

func main() {
fileServer := http.FileServer(http.Dir("./static"))
http.Handle("/", fileServer)
http.HandleFunc("/hello", helloHandler)
http.HandleFunc("/healthcheck", healthcheckHandler)

fmt.Printf("Starting server at port 9090\n")
if err := http.ListenAndServe(":9090", nil); err != nil {
log.Fatal(err)
}
}

这里为了明确效果,我们将这个网关和Nginx做了对比,使用下面的命令进行压测并对比结果(自己的电脑,QPS上不去,最多只能100 Concurrency)

1
ab -n 10000 -c 100 -k "http://127.0.0.1:8080/healthcheck"

分别压测网关和Nginx,两者都是单线程/进程,同时Nginx开启了deamon。下面是网关的结果,基本可以看到100个线程,平均每个线程1秒可以完成7.4个请求(每个请求需要100多毫秒才能返回),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Server Software:
Server Hostname: 192.168.1.3
Server Port: 8080

Document Path: /healthcheck
Document Length: 3 bytes

Concurrency Level: 100
Time taken for tests: 13.406 seconds
Complete requests: 10000
Failed requests: 0
Keep-Alive requests: 10000
Total transferred: 650000 bytes
HTML transferred: 30000 bytes
Requests per second: 745.94 [#/sec] (mean)
Time per request: 134.058 [ms] (mean)
Time per request: 1.341 [ms] (mean, across all concurrent requests)
Transfer rate: 47.35 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 2.4 0 28
Processing: 106 132 13.0 129 199
Waiting: 106 132 13.0 129 199
Total: 106 132 13.7 129 199

Percentage of the requests served within a certain time (ms)
50% 129
66% 134
75% 139
80% 142
90% 150
95% 156
98% 167
99% 186
100% 199 (longest request)

下面是Nginx的结果,Nginx在最后一波请求的时候特别慢(具体原因待排查),导致性能会差一些

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Server Software:        nginx/1.8.1
Server Hostname: 192.168.1.3
Server Port: 8080

Document Path: /healthcheck
Document Length: 3 bytes

Concurrency Level: 100
Time taken for tests: 45.134 seconds
Complete requests: 10000
Failed requests: 17
(Connect: 0, Receive: 0, Length: 17, Exceptions: 0)
Non-2xx responses: 17
Keep-Alive requests: 9960
Total transferred: 1649167 bytes
HTML transferred: 39078 bytes
Requests per second: 221.56 [#/sec] (mean)
Time per request: 451.336 [ms] (mean)
Time per request: 4.513 [ms] (mean, across all concurrent requests)
Transfer rate: 35.68 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 3.4 0 39
Processing: 116 421 2375.9 177 26593
Waiting: 116 421 2375.9 177 26593
Total: 116 421 2375.9 177 26593

Percentage of the requests served within a certain time (ms)
50% 177
66% 187
75% 194
80% 198
90% 210
95% 223
98% 246
99% 13434
100% 26593 (longest request)

其实理论上要做并发更大的测试,但是考虑到下周会对网关做10KQPS的压测,这里先不弄,因为目前的结果已经符合预期了。
总的来说,我们的Vert.x+Kotlin Coroutine的HTTP网关的性能比Nginx要好,这个可能与它用了多个I/O线程有关,同时协程占用内存的问题,可能还需要进一步评估。

四、总结

  • 协程并没有像线程那样的时间片调度的方式,而是事件触发方式的,因此需要确保每个协程在执行过程一定不要有耗时操作,否则会影响其他协程的执行
  • 经过suspend和resume这两波操作,Vert.x竟然实现了和上一节说到的基于事件的差不过的效果,可见编程模型也o是挺重要的
  • 协程还有很多值得探讨的地方,例如异常处理、取消处理、超时处理等等。

五、Reference

  1. Kotlin:深度理解协程挂起恢复实现原理
  2. Kotlin Coroutine原理
  3. Kotlin Coroutine

当前网关的代码整体是异步阻塞的模式,阻塞主要体现在epoll的EventLoop那里,现在要在网关内部加一些逻辑,如果按照当前其他代码的方式,添加的是阻塞的代码,会导致EventLoop被阻塞,当所有EventLoop都被阻塞的时候(当前线程数是CPU两倍),网关不能处理新进来的请求的速度会减慢,整体的吞吐会降低。

一、如何解决

解决的办法,其实就是参考当前Vert.x的实现方式,一旦碰到耗时的操作,都放到EventLoop中去,例如需要从Socket读数据的时候,是将Socket的fd加到epoll中,并监听read事件,在read事件到达时候调用相应的处理函数,进行解包处理,并在包被完整解开之后调用相应的处理函数进行处理。
对应到实际代码中,以Redis的Letuce Client为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
EventExecutorGroup eventLoopGroup =  new DefaultEventExecutorGroup(8, new DefaultThreadFactory("wudan-group", true));

ClientResources clientResources = DefaultClientResources
.builder()
.eventExecutorGroup(eventLoopGroup)
.build();
RedisClient redisClient = RedisClient.create(clientResources, "redis://localhost:6379");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisReactiveCommands<String, String> reactiveCommand = connection.reactive();

reactiveCommand.get("wudan").subscribe(s -> {
System.out.println(Thread.currentThread().getName() + " " + s);
});

Thread.sleep(1000L);

执行当前代码的线程和执行subscribe中打印代码的线程不是一个线程,因为subscribe是EventLoop在read事件之后触发的,并在EventLoop线程中执行了。
因此,对于网关的情况,我们只需要使用这种Reactive的方式来执行即可,对于Redis的操作,通过lettuce Client来执行,对于gRPC的调用,我们使用新的基于Reactor Core的Client。

二、问题

上述解决方案,存在一个小问题,由于不同的组建的EventLoop不是同一个,当我们从一个切换到另一个时,相当于把之后代码的运行权都交给了后面的EventLoop,存在一定的不可控性,因为后续的EventLoop可能没有fine tuned,以及当前的一些监控都没有加到后续的EventLoop上。
一个比较统一的方式是让所有程序块都用相同一个EventLoop,例如上面的代码中lettuce是支持设置EventLoop的。

三、延伸

关于同步、异步、阻塞、非阻塞这个古老的问题,认识又进一步加深了。个人的理解是

  • 异步和同步的差异在于,是否在同一个线程里处理耗时操作,同步是在当前线程,异步是在另一个线程,异步的好处是,如果有多个耗时操作,我可以并行的去执行,这样需要等的时间就是单个操作的Max时间,而同步的等待时间是所有操作的总时间
  • 阻塞和非阻塞的差异在于,是否直接调用耗时的系统调用(例如read),如果是直接调用read,那就是阻塞直到可读,如果是调用select、poll、epoll之类的系统调用,可以去执行别的代码(目前没找到例子),然后再回来不断的去调用epoll去查看,直到返回可读状态的时候,再去调用read系统调用,读取数据并处理

四、总结

第一部分中的判断是基于对于线程模型的理解,需要实际的压测结果来支持。
个人认为Reactor的方式已经基本接近异步非阻塞了,只是需要一个阻塞的EventLoop线程来对一堆fd做epoll操作,可以将上下文代码通过subscribe的方式来连接起来,核心点是需要将需要的数据和状态存储到一个context里。
由于Reactor依赖所有的代码都采用Reactor的方式,而很多互联网公司通常都会自己写一些组件,因此在整个公司层面存在这样的意识,否则无法让所有组件都支持Reactor,从而不能达到预期的目标。

五、Reference

  1. asynchronous and non-blocking calls? also between blocking and synchronous
  2. Asynchronous I/O

这一节主要讲解Nginx的健康检查算法,由于Nginx并未开源其健康检查算法,只在商用版本的Nginx Plus中提供这个功能,我们参考Tengine的实现来讨论

一、主动健康检查

主动健康检查是指在Nginx定时固定的时间去调用upstream中节点的特定接口,并根据返回判断节点的可用性,其配置文件如下

1
2
3
4
5
6
7
8
9
10
upstream hello {
vnswrr;
server 127.0.0.1:9090 weight=1;
server 127.0.0.1:9090 weight=2;
server 127.0.0.1:9090 weight=3;
check interval=3000 rise=2 fall=5 timeout=1000 type=http;
check_keepalive_requests 100;
check_http_send "HEAD /healthcheck HTTP/1.1\r\nHost: localhost:9090\r\n\r\n";
check_http_expect_alive http_2xx http_3xx;
}

上述配置项中,check那一行表示进行主动健康检查,检查周期是3秒一次,每次检查的超时是1秒,并在2次成功后认为节点是up状态,5次失败后认为节点是down状态,通过http发送健康检查请求;check_keepalive_requests那一行指定了保活的请求数,可以减少和upstream中节点频繁建立tcp连接;heck_http_send那一行表示发送的HTTP请求的内容;check_http_expect_alive那一行表示健康检查的返回状态码只要是2xx或者3xx就认为是成功。
具体到代码上,模块在初始化的时候,初始化了回调函数ngx_http_upstream_check_init_process,在这个函数中会添加check的定时事件到eventloop中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ngx_module_t  ngx_http_upstream_check_module = {
NGX_MODULE_V1,
&ngx_http_upstream_check_module_ctx, /* module context */
ngx_http_upstream_check_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_http_upstream_check_init_process, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};

最终调用upstream的接口,获取响应后得到result,result为1表示成功,result为0表示失败,并在下面的主体逻辑中进行判断,更新peer->shm->down这个标志位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void
ngx_http_upstream_check_status_update(ngx_http_upstream_check_peer_t *peer,
ngx_int_t result)
{
ngx_http_upstream_check_srv_conf_t *ucscf;

ucscf = peer->conf;

ngx_shmtx_lock(&peer->shm->mutex);

if (peer->shm->delete == PEER_DELETED) {

ngx_shmtx_unlock(&peer->shm->mutex);
return;
}

if (result) {
// 健康检查成功
peer->shm->rise_count++;
peer->shm->fall_count = 0;
if (peer->shm->down && peer->shm->rise_count >= ucscf->rise_count) {
// 成功次数超rise,认为up
peer->shm->down = 0;
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"enable check peer: %V ",
&peer->check_peer_addr->name);
}
} else {
// 健康检查失败
peer->shm->rise_count = 0;
peer->shm->fall_count++;
if (!peer->shm->down && peer->shm->fall_count >= ucscf->fall_count) {
// 失败次数超过fall,认为down
peer->shm->down = 1;
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
"disable check peer: %V ",
&peer->check_peer_addr->name);
}
}

peer->shm->access_time = ngx_current_msec;

ngx_shmtx_unlock(&peer->shm->mutex);
}

而在实际使用过程中,以vnswrr负载均衡算法为例,会过滤掉健康检查失败(down)的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ngx_http_upstream_vnswrr_module.c:ngx_http_upstream_get_vnswrr
#if (NGX_HTTP_UPSTREAM_CHECK)
if (ngx_http_upstream_check_peer_down(peer->check_index)) {
continue;
}
#endif


// ngx_http_upstream_check_module.c
ngx_uint_t
ngx_http_upstream_check_peer_down(ngx_uint_t index)
{
ngx_http_upstream_check_peer_shm_t *peer_shm;

if (upstream_check_index_invalid(check_peers_ctx, index)) {
return 0;
}

peer_shm = check_peers_ctx->peers_shm->peers;

// 通过down字段来决定是否down
return (peer_shm[index].down);
}

二、被动健康检查

被动健康检查,实际就是在每次请求upstream中节点的时候,根据返回数据的情况,来决定节点的健康情况,以下面的配置文件为例

1
2
3
4
5
6
upstream hello {
vnswrr;
server 127.0.0.1:9090 weight=1 max_fails=2 fail_timeout=1000;
server 127.0.0.1:9090 weight=2 max_fails=2 fail_timeout=1000;
server 127.0.0.1:9090 weight=3 max_fails=2 fail_timeout=1000;
}

其中max_fails表示失败多少次就认为节点down了,前提是check的事件没有超过fail_timeout毫秒数,具体到代码中,以vnswrr为例,会在选择节点的时候,过滤掉满足上述条件的节点。

1
2
3
4
5
6
7
8
// ngx_http_upstream_vnswrr_module.c:ngx_http_upstream_get_vnswrr

if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}

而上述的失败次数其实是在下面的函数中被更新的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// ngx_http_upstream_round_robin.c
void
ngx_http_upstream_free_round_robin_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
ngx_http_upstream_rr_peer_data_t *rrp = data;

time_t now;
ngx_http_upstream_rr_peer_t *peer;

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free rr peer %ui %ui", pc->tries, state);

/* TODO: NGX_PEER_KEEPALIVE */

peer = rrp->current;

ngx_http_upstream_rr_peers_rlock(rrp->peers);
ngx_http_upstream_rr_peer_lock(rrp->peers, peer);

if (rrp->peers->single) {

peer->conns--;

ngx_http_upstream_rr_peer_unlock(rrp->peers, peer);
ngx_http_upstream_rr_peers_unlock(rrp->peers);

pc->tries = 0;
return;
}

if (state & NGX_PEER_FAILED) {
now = ngx_time();

// 失败次数加1
peer->fails++;
peer->accessed = now;
// 更新修改时间
peer->checked = now;

if (peer->max_fails) {
// 权重降低
peer->effective_weight -= peer->weight / peer->max_fails;

if (peer->fails >= peer->max_fails) {
ngx_log_error(NGX_LOG_WARN, pc->log, 0,
"upstream server temporarily disabled");
}
}

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
"free rr peer failed: %p %i",
peer, peer->effective_weight);

if (peer->effective_weight < 0) {
peer->effective_weight = 0;
}

} else {

/* mark peer live if check passed */
if (peer->accessed < peer->checked) {
peer->fails = 0;
}
}

peer->conns--;

ngx_http_upstream_rr_peer_unlock(rrp->peers, peer);
ngx_http_upstream_rr_peers_unlock(rrp->peers);

if (pc->tries) {
pc->tries--;
}
}

而上述函数的主要调用点事 nginx_http_upstream.c:ngx_http_upstream_next 函数,主要是在一些连接失败、参数或者响应格式错误、请求响应超时的时候调用,并不涉及到http的status code的判断。从我的角度上看基本就是连接断开的情况。

三、总结

健康检查最终影响的是节点的状态是up还是down,只要在不同的负载均衡算法选择peer的过程去判断这个状态即可。主动负载均衡是使用了共享内存,方便了不同Worker可以共享这份数据。

四、Reference

  1. Tengine
  2. ngx_http_upstream_check_module

这一节主要讲解Nginx的负载均衡算法,并介绍Tengine新引入的vnswrr算法(目前我司的接入层使用的负载均衡算法)。在讨论不同的算法前,重申下前提,下面的算法基本都会在每次选择一个节点之后,修改节点的current_weight,这种情况其实是线程不安全的,但是,由于Nginx是多进程(Worker),Master进程fork出Worker进程之后,两个进程的地址空间不共享(在读的时候共享,发生写的时候会复制一份),Worker和Master以及Worker之间都是线程安全的,且Worker内部是单线程的eventloop,因此是没有问题的。

一、(Weighted)Round Robin

(带权重)轮询算法是非常常用的负载均衡算法,例如按照下面的配置三个权重分别是1、2、3的三个server,分别用A、B、C表示

1
2
3
4
5
upstream hello {
server 127.0.0.1:9090 weight=1;
server 127.0.0.1:9090 weight=2;
server 127.0.0.1:9090 weight=3;
}

为了方便选择,引入current_weight的概念,current_weight初始值为0,同时总的权重值称为total_weight,当前为6。算法执行的过程是,每次server节点增加weight到currnet_weight,然后选择current_weight最大的,并将这个节点的current_weight减去total_weigth,具体如下图,绿色节点代表本轮选中的节点
weigted round robin
上述一轮选择过程C被选中3次,B被选中2次,A被选中1次,符合权重的设置,且选择较为平滑,每个节点没有被连续选中。注意在实际使用过程,上述weight其实用的是effective_weight,表示当前节点的动态权重,这个权重不会超过weight,因为节点运行过程可能会上下线或者故障,因此effective_weight其实是变化的,整个算法的时间复杂度为O(N)。Nginx里对应的选择逻辑的核心算法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// ngx_http_upstream_round_robin.c
static ngx_http_upstream_rr_peer_t *
ngx_http_upstream_get_peer(ngx_http_upstream_rr_peer_data_t *rrp)
{
time_t now;
uintptr_t m;
#if (T_NGX_HTTP_UPSTREAM_RANDOM)
ngx_int_t total, flag;
#else
ngx_int_t total;
#endif
ngx_uint_t i, n, p;
ngx_http_upstream_rr_peer_t *peer, *best;

now = ngx_time();

best = NULL;
total = 0;

#if (NGX_SUPPRESS_WARN)
p = 0;
#endif

#if (T_NGX_HTTP_UPSTREAM_RANDOM)
// 随机出一个起始值
if (rrp->peers->init_number == NGX_CONF_UNSET_UINT) {
rrp->peers->init_number = ngx_random() % rrp->peers->number;
}

for (peer = rrp->peers->peer, i = 0; i < rrp->peers->init_number; i++) {
peer = peer->next;
}

flag = 1;
for (i = rrp->peers->init_number;
i != rrp->peers->init_number || flag;
i = (i + 1) % rrp->peers->number,
peer = peer->next ? peer->next : rrp->peers->peer)
{
flag = 0;

#else
for (peer = rrp->peers->peer, i = 0;
peer;
peer = peer->next, i++)
{
#endif
n = i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));

if (rrp->tried[n] & m) {
continue;
}

if (peer->down) {
continue;
}

#if (NGX_HTTP_UPSTREAM_CHECK)
if (ngx_http_upstream_check_peer_down(peer->check_index)) {
continue;
}
#endif

if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}

if (peer->max_conns && peer->conns >= peer->max_conns) {
continue;
}

// 这里加的是effective_weight
peer->current_weight += peer->effective_weight;
total += peer->effective_weight;

if (peer->effective_weight < peer->weight) {
peer->effective_weight++;
}

// 选current_weight最大的
if (best == NULL || peer->current_weight > best->current_weight) {
best = peer;
p = i;
}
}

if (best == NULL) {
return NULL;
}

rrp->current = best;

n = p / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));

rrp->tried[n] |= m;

// current_weitht减去总权重
best->current_weight -= total;

if (now - best->checked > best->fail_timeout) {
best->checked = now;
}

// 选择的结果
return best;
}

二、(Weighted)Random

在权重的基础上,随机选择,配置文件如下

1
2
3
4
5
6
upstream hello {
random;
server 127.0.0.1:9090 weight=1;
server 127.0.0.1:9090 weight=2;
server 127.0.0.1:9090 weight=3;
}

选择的流程例子如下
weighted random
Nginx代码实现如下,时间复杂度为O(log(TW))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// ngx_http_upstream_random_module.c
// 初始化range
static ngx_int_t
ngx_http_upstream_update_random(ngx_pool_t *pool,
ngx_http_upstream_srv_conf_t *us)
{
// ... ...
total_weight = 0;

// 构造每个节点的range
for (peer = peers->peer, i = 0; peer; peer = peer->next, i++) {
ranges[i].peer = peer;
ranges[i].range = total_weight;
total_weight += peer->weight;
}

rcf->ranges = ranges;

return NGX_OK;
}

static ngx_uint_t
ngx_http_upstream_peek_random_peer(ngx_http_upstream_rr_peers_t *peers,
ngx_http_upstream_random_peer_data_t *rp)
{
ngx_uint_t i, j, k, x;

x = ngx_random() % peers->total_weight;

i = 0;
j = peers->number;

// 二分查找
while (j - i > 1) {
k = (i + j) / 2;

if (x < rp->conf->ranges[k].range) {
j = k;

} else {
i = k;
}
}

return i;
}

三、(Weigted)Hash

配置文件如下,下面是根据请求的uri哈希,也就是每次只要请求的uri不变,打到的upstream节点也是不会变的

1
2
3
4
5
6
upstream hello {
hash $request_uri;
server 127.0.0.1:9090 weight=1;
server 127.0.0.1:9090 weight=2;
server 127.0.0.1:9090 weight=3;
}

核心算法如下,先通过crc32算法计算出hash,然后将hash对总权重取模,再利用Random算法类似的原理根据权重选节点(只存在输入参数的差异),只是这里用的方法的时间复杂度是O(N)

1
2
3
4
5
6
7
8
9
10
// ngx_http_upstream_hash_module.c:ngx_http_upstream_get_hash_peer
w = hp->hash % hp->rrp.peers->total_weight;
peer = hp->rrp.peers->peer;
p = 0;

while (w >= peer->weight) {
w -= peer->weight;
peer = peer->next;
p++;
}

除了上述这种通用的hash,还存在ip_hash,算法基本是一样,只是hash的来源换成了client的ip,这里就不赘述。值得注意的是hash方式容易产生流量分配不均的问题,因此hash参数的选择需要慎重,一般是在statefull的后端(例如带session的情况)才需要使用基于Hash的负载均衡。crc32的hash离散程度应该是不及mur_mur_hash的。

四、(Weighted)Least Connection

按照每个server上的连接数来决定,同时考虑了权重,例如节点A权重为2,连接数为2,节点B权重为4,连接数为3,在选择的时候会选择B。
配置文件如下

1
2
3
4
5
6
upstream hello {
least_conn;
server 127.0.0.1:9090 weight=1;
server 127.0.0.1:9090 weight=2;
server 127.0.0.1:9090 weight=3;
}

核心算法如下,当遇到多个节点连接数一样的时候(many=1),会用之前讲到的weight round robin来选择出一个,整体时间复杂度为O(N)

1
2
3
4
5
6
7
8
9
10
11
12
// ngx_http_upstream_least_conn_module.c:ngx_http_upstream_get_least_conn_peer
if (best == NULL
|| peer->conns * best->weight < best->conns * peer->weight)
{
best = peer;
many = 0;
p = i;

} else if (peer->conns * best->weight == best->conns * peer->weight) {
// 如果存在多个,后续会按照weight round robin来随机一个
many = 1;
}

五、vnswrr(Virtual Node Smooth Weighted Round Robin)

这个负载均衡算法是淘宝开源的Tengine新加入的算法,配置文件如下,只是多了个vnswrr

1
2
3
4
5
6
upstream hello {
vnswrr;
server 127.0.0.1:9090 weight=1;
server 127.0.0.1:9090 weight=2;
server 127.0.0.1:9090 weight=3;
}

其执行过程如下图,这里把起始点进行了随机选择(图中选择第三个),其实质就是按照权重把节点都创建到一个数组(按照 weight round robin的顺序),然后轮流选择
virtual node smoth weitht round robin
就是这个傻瓜算法,Tengine官方没提供详细的介绍,而且取了个玄乎的名字。算法属于空间换时间,时间复杂度从O(N)降低O(1),而空间复杂度从O(1)提升到了O(TW),Tengine官方声称性能60%(节点数2000个时),但是没提供具体的权重数据,如果权重达到50000时,同时每个虚拟节点占用16B,会占用1G+的内存,当然实际使用过程一般不会配置这么大的权重。
Tengine的具体代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// ngx_http_upstream_vnswrr_module.c
static ngx_http_upstream_rr_peer_t *
ngx_http_upstream_get_vnswrr(ngx_http_upstream_vnswrr_peer_data_t *vnsp)
{
time_t now;
uintptr_t m;
ngx_uint_t i, n, p, flag, begin_number;
ngx_http_upstream_rr_peer_t *peer, *best;
ngx_http_upstream_rr_peers_t *peers;
ngx_http_upstream_rr_vpeers_t *vpeers;
ngx_http_upstream_rr_peer_data_t *rrp;
ngx_http_upstream_vnswrr_srv_conf_t *uvnscf;

now = ngx_time();

best = NULL;

#if (NGX_SUPPRESS_WARN)
p = 0;
#endif

rrp = &vnsp->rrp;
peers = rrp->peers;
uvnscf = vnsp->uvnscf;
vpeers = uvnscf->vpeers;

// 初始化init_number/laster_number(只执行一次),尽量让每个worker的流量打散到不同的upstream机器
if (uvnscf->last_number == NGX_CONF_UNSET_UINT) {
uvnscf->init_number = ngx_random() % peers->number;

if (peers->weighted) {
peer = vpeers[uvnscf->init_number].vpeer;

} else {
for (peer = peers->peer, i = 0; i < uvnscf->init_number; i++) {
peer = peer->next;
}
}

uvnscf->last_number = uvnscf->init_number;
uvnscf->last_peer = peer;
}

// 表示upstream机器后配置了权重
if (peers->weighted) {
// 运行时初始化虚拟节点,虚拟节点存储在vpeers数组,总的数量为peers->total_weight
/* batch initialization vpeers at runtime. */
if (uvnscf->vnumber != peers->total_weight
&& (uvnscf->last_number + 1 == uvnscf->vnumber))
{
n = peers->total_weight - uvnscf->vnumber;
if (n > peers->number) {
n = peers->number;
}

ngx_http_upstream_init_virtual_peers(peers, uvnscf, uvnscf->vnumber,
n + uvnscf->vnumber);

}

begin_number = (uvnscf->last_number + 1) % uvnscf->vnumber;
peer = vpeers[begin_number].vpeer;

} else {
// 轮询
if (uvnscf->last_peer && uvnscf->last_peer->next) {
begin_number = (uvnscf->last_number + 1) % peers->number;
peer = uvnscf->last_peer->next;

} else {
begin_number = 0;
peer = peers->peer;
}
}

// 看着是循环,其实正常情况下只执行一次
for (i = begin_number, flag = 1; i != begin_number || flag;
i = peers->weighted
? ((i + 1) % uvnscf->vnumber) : ((i + 1) % peers->number),
peer = peers->weighted
? vpeers[i].vpeer : (peer->next ? peer->next : peers->peer))
{

flag = 0;
if (peers->weighted) {

n = peers->total_weight - uvnscf->vnumber;
if (n > peers->number) {
n = peers->number;
}

// 再次确保虚拟节点都初始化了,如果已经初始化了就啥也不会做
ngx_http_upstream_init_virtual_peers(peers, uvnscf, uvnscf->vnumber,
n + uvnscf->vnumber);
// bitmap,重试的时候不选择上次用的upstream实例
n = vpeers[i].rindex / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << vpeers[i].rindex % (8 * sizeof(uintptr_t));

} else {
n = i / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
}

// bitmap,重试的时候不选择上次用的upstream实例
if (rrp->tried[n] & m) {
continue;
}

// 不选择donw了的
if (peer->down) {
continue;
}

// 不选择失败太多的
if (peer->max_fails
&& peer->fails >= peer->max_fails
&& now - peer->checked <= peer->fail_timeout)
{
continue;
}

#if defined(nginx_version) && nginx_version >= 1011005
// 不选择连接数太多的
if (peer->max_conns && peer->conns >= peer->max_conns) {
continue;
}
#endif

#if (NGX_HTTP_UPSTREAM_CHECK)
// 不选择healthcheck认定为down了的
if (ngx_http_upstream_check_peer_down(peer->check_index)) {
continue;
}
#endif

best = peer;
uvnscf->last_peer = peer;
uvnscf->last_number = i;
p = i;
break;
}

if (best == NULL) {
return NULL;
}

rrp->current = best;

if (peers->weighted) {
n = vpeers[p].rindex / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << vpeers[p].rindex % (8 * sizeof(uintptr_t));

} else {
n = p / (8 * sizeof(uintptr_t));
m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
}

// 设置bitmap
rrp->tried[n] |= m;

if (now - best->checked > best->fail_timeout) {
best->checked = now;
}

// 选择的结果
return best;
}

六、总结

负载均衡算法是Nginx比较基础的功能,但是大部分算法的时间复杂度都达到了O(N)级别,只有random达到了O(log(TW))级别,不知道为什么没有优化(如果upstream里节点较少其实影响不大)。vnswrr算法性能更好,但是官方没有patch到Nginx主分支中,这又是什么原因呢?
个人理解可能是国外开发者比较注重代码的可读性、简单性,简单的代码是最不容易出错的,因为高效率的算法也是有代价的,例如vnswrr的代价就是内存占用多了如果权重配的很大,同时后端的server节点数量很多时,每个Worker的负载均衡的内存使用可能要上G,也是对内存的一种浪费。

七、Reference

  1. Tengine
  2. 深入浅出Nginx负载均衡算法
  3. ngx_http_upstream_vnswrr_module

最近在做网关相关工作,和Nginx关系比较密切,故了解下它的具体实现。用cloc简单看了下,Nginx总共的C代码不到10W行,基本和Redis差不多,可以说是非常少了,而且Nginx的代码非常经典,以快速和高效著称,值得一试。考虑到Nginx的功能太基础,大多数国内互联网公司都使用的是淘宝开发的基于Nginx扩展出来的Tengine,其中包括负载均衡算法支持vnswrr(虚拟节点平滑权重轮询),以及主动healthcheck功能(Nginx的healthcheck功能需要购买Nginx Plus才可以使用),因此以Tengine作为分析对象。

一、环境准备

先运行下面的命令,下载和配置编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 系统环境 macOS Big Sur

# 安装openssl依赖
cd ~/software
wget https://github.com/openssl/openssl/archive/refs/heads/master.zip
unzip master.zip
cd openssl-master
./config -d
make

# 下载阶段
cd ~/software
# 如果是nginx可以下载 https://nginx.org/download/nginx-1.8.1.tar.gz
wget https://github.com/alibaba/tengine/archive/refs/heads/master.zip
unzip master.zip
cd tengine
mkdir -p workspace/logs
# html 目录里面有一些静态文件例如: index.html
cp -rf html workspace/

# 编译配置阶段
brew install pcre -y #Nginx rewrite功能的一个依赖,正则表达式相关
./configure --prefix=./workspace --with-debug --add-module=./modules/ngx_http_upstream_vnswrr_module --add-module=./modules/ngx_http_upstream_check_module --with-openssl=/Users/danwu/software/openssl-master
# 上面的命令会在obj目录下生成Makefile文件

把生成的Makefile中default改成all(或者修改Clion的preference中Makefile的build target从all改成default)
然后,使用Clion打开Makefile文件,选择Open as Project,注意要把弹出的Clean命令勾选去掉,不然执行clean就把Makefile删掉了。
点击左上角绿色的锤子执行编译,通过编译日志中的-g选项可以确认是带debug信息的编译。

在conf/nginx.conf配置文件如下,其中master_process off能确保单线程运行(不添加worker),这个是可以debug的关键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#user  nobody;
worker_processes auto;
# 关闭后台进程模式,运行在前台
daemon off;
# 单进程模式,只在debug的时候关闭
master_process off;

error_log logs/error.log;
error_log logs/error.log notice;
error_log logs/error.log debug;
error_log "pipe:rollback logs/error_log interval=1d baknum=7 maxsize=2G";

pid logs/nginx.pid;

events {
worker_connections 1024;
}

http {
include mime.types;
default_type application/octet-stream;

log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';

access_log logs/access.log main;
access_log "pipe:rollback logs/access_log interval=1d baknum=7 maxsize=2G" main;

sendfile on;
#tcp_nopush on;

#keepalive_timeout 0;
keepalive_timeout 65;

#gzip on;

upstream hello {
vnswrr;
server 127.0.0.1:9090 weight=5;
server 127.0.0.1:9090 weight=2;
check interval=3000 rise=2 fall=5 timeout=1000 type=http;
check_keepalive_requests 100;
check_http_send "HEAD /healthcheck HTTP/1.1\r\nHost: localhost:9090\r\n\r\n";
check_http_expect_alive http_2xx http_3xx;
}

server {
listen 9999;
server_name localhost;

location / {
root html;
index index.html index.htm;
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}

location /hello {
proxy_pass http://hello;
}
}
}

编辑Run -> Edit Configurations,选择target为all或者default/build,然后Executable为obj/nginx,添加启动参数

1
-c /Users/wudan03/software/tengine/conf/nginx.conf

为了让upstream生效,我们起一个golang的http server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 通过 go run main.go 运行即可
package main

import (
"fmt"
"log"
"time"
"net/http"
)

func helloHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Fprintf(w, "Hello World!")
}

func healthcheckHandler(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
fmt.Fprintf(w, "Ok!")
}

func main() {
fileServer := http.FileServer(http.Dir("./static"))
http.Handle("/", fileServer)
http.HandleFunc("/hello", helloHandler)
http.HandleFunc("/healthcheck", healthcheckHandler)

fmt.Printf("Starting server at port 9090\n")
if err := http.ListenAndServe(":9090", nil); err != nil {
log.Fatal(err)
}
}

然后在ngx_http_request.c:ngx_http_process_request方法中打断点,并点击绿色甲壳虫开始debug,使用下面的curl触发调用

1
curl "http://127.0.0.1:9999/index.html"

通过上述curl请求验证程序在断点处停止执行

二、http请求处理流程

首先需要明确几个基本概念

a. 多进程/多线程

Nginx使用1个Master + N个Worker的方式运行,Worker数量一般为CPU数量,Worker是通过fork创建出来的(线程是pthread_create创建出来的),这里使用进程而不是线程的原因是,地址空间是隔离的,不会互相影响,不存在竞争,因为线程之间是可以互相共享数据的,典型的例子是负载均衡如下一篇讲到的round robin算法,每次都要记录轮询到了哪个位置,如果是并发的就需要加锁,性能就会很差。
问:Redis为什么不能像Nginx一样多线程?
答:Nginx本身是stateless的,不存储实际的数据,数据都来自配置文件,可以每个进程维护一份,而Redis本身需要读写数据,如果多进程,实际是需要加锁的。我们可以通过在一台机器上部署多个Redis实例来充分利用CPU

b. NIO(epoll/kqueue)

Nginx使用的是经典的事件驱动的NIO模型,Linux下epoll/macOS下kqueue来处理外来请求,并通过hander来绑定事件触发之后的回调函数。

c. socket

bio socket server的典型流程是:

  1. 调用socket系统调用,创建一个fd(File Descriptor)
  2. 调用bind系统调用,将第一步创建的socket绑定到本地的端口
  3. 调用listen系统调用,开始监听端口上新来的连接
  4. 在死循环中调用accept系统调用,会阻塞知道有新的连接过来,会返回一个新的fd,程序创建新的线程或者fork新的进程,在里面处理这个fd上的数据读写
    这个过程只是要说明一件事,监听端口的fd和新进来的请求的fd不是同一个,因此在监听端口的fd的读操作的handler中处理新连接即可,然后在新请求的fd的读写操作的handler中分别做读写数据的处理

Nginx处理upstream请求的大致流程如下图
nginx process overview
整个流程通过NIO eventloop连接起来,不是太好串联,其中Step8应该是从eventloop中出来的,图里画不下了。
其中Step7主要是发请求给proxy,而Step8主要是将proxy发回的数据写回Downstream(即连到nginx的client)

三、总结

整理的流程非常复杂,但是我们只要抓到设置handler的地方就能找到下个事件的处理函数,同时要紧跟添加事件的地方。整个代码还包含了缓存、过滤等逻辑,以及处理静态文件和其他协议、以及https的请求的过程,这里只介绍了个大概,按需处理,后面两期分别介绍目前比较关注的负载均衡和healthcheck机制。

四、Reference

  1. Tengine
  2. linux下gcc编程10-clion编译调试nginx
  3. Nginx源码分析 - HTTP模块篇 - HTTP Request解析过程
  4. Nginx源码分析

简介

众所周知,Java不需要像C一样在创建对象之后,通过free来销毁对象,因为Java会有专门的gc程序通过可达性分析来回收不用的对象,而可达性分析使用的是引用,Java中引用分为强引用、弱引用(WeakReference)、软引用(SoftReference)、虚引用(PhantomReference),不同的引用类型在gc的时候会有不同的处理策略,下面做简要分析。

强引用

最常见的引用,new出来的对象就是强引用,只要有强引用的对象就不会被gc程序回收

弱引用(WeakReference)

在gc扫到弱引用的时候,就会回收弱引用所指向的对象(前提是它没有别的强引用指向了),可以通过下面的代码佐证

1
2
3
4
WeakReference<Object> weak = new WeakReference<>(new Object());
System.out.println(weak.get());// 返回object
System.gc();
System.out.println(weak.get());// 返回null

软引用(SoftReference)

软引用是在内存不够的时候,gc程序会去回收,我们先将最大堆内存设置成6M(-Xmx6m),运行下面的代码验证

1
2
3
4
5
6
// -Xmx6m
SoftReference<int[]> soft = new SoftReference<>(new int[512 * 1024]);
System.out.println(soft.get());// 返回数组,因为内存还够
int[] arr2 = new int[512 * 1024]; // 内存不够了,会gc
arr2[0] = 1;
System.out.println(soft.get());// 返回null,因为内存不够了

虚引用(PhantomReference)

虚引用,创建之后,get出来的值一直都是null,创建虚引用的时候必须要传一个引用队列进去,会在被引用对象gc的时候将引用放到引用队列中,然后我们可以通过探测这个引用队列来检测对象的释放过程,在对象释放的时候做一些事情,运行下面的代码验证

1
2
3
4
5
6
7
8
9
10
11
ReferenceQueue referenceQueue = new ReferenceQueue();
PhantomReference<Object> soft = new PhantomReference<>(new Object(), referenceQueue);
System.out.println(soft.get());// 返回null
System.out.println(referenceQueue.poll());// poll不出来,因为没有gc
System.gc();
Thread.sleep(5000);
Reference poll = referenceQueue.poll();// 可以poll出来,因为gc了
System.out.println(poll);
if (poll != null) {
System.out.println(poll.get());
}

虚引用的一个典型的应用,是在Java的DirectBuffer中,它控制了直接内存可以由gc程序去回收,而不用手动回收,
我们默认的new对象都是在堆内创建的,会被jvm管理,如果需要创建不受jvm的gc管理的内存(称为堆外内存),则需要通过Unsafe调用native方法进行分配

1
2
3
4
5
6
7
8
9
10
11
private static void allocateDirect() {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
byteBuffer.put(0, (byte) 0x55);
System.out.println(byteBuffer.get(0));
}
public static void main(String[] args) throws InterruptedException {
allocateDirect();
System.gc();
Thread.sleep(5000);
System.exit(0);
}

上面的代码需要点进去才能看到,在创建DirectBuffer的时候,会创建一个Cleaner(继承了PhantomReference),里面带一个Deallocator专门负责调用Unsafe去释放堆外内存,

1
2
// Cleaner是PhantomReference的实现类,虚引用在引用队列中被poll出来之后,也是get不出来东西的,需要根据需要把一些数据放到PhantomReference中或者存储一个Reference到这些数据的一个对应关系map,才能在poll出来之后做一些实际的操作
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));

Cleaner的clean方法中会调用Deallocator,而clean方法会被Reference中的静态变量ReferenceHandler线程池调用,线程池会不断获取最近被回收的引用(pending),然后调用tryHandlePending方法去尝试执行Cleaner的clean方法。
注意到这个最近被回收的引用(pending)是垃圾回收器给设置值进去的,这种交互方式会感觉挺奇怪的,但是没有办法,gc程序不存在于java代码库中

1
2
3
4
5
6
/* List of References waiting to be enqueued.  The collector adds
* References to this list, while the Reference-handler thread removes
* them. This list is protected by the above lock object. The
* list uses the discovered field to link its elements.
*/
private static Reference<Object> pending = null;

总结

引用是很老旧的概念,实际应用比较狭窄,在Guava Cache中有WeakKey、WeakValue、SoftValue的设置来将key、value设置成不同的引用,但是实际使用的时候基本都是用缓存时间或者是缓存的大小来控制缓存,很少有用引用。而虚引用也只在一些特殊的情况下使用,总的来说这些都是有用的概念,但是只会在很关键的时候使用,平时使用很少。

Reference

  1. Netty源码—七、内存释放

简介

Netty是一个高性能的NIO框架,理论上说它工作在四层(传输层/TCP、UDP等),我们可以在上面构建各种类型的七层(应用层/HTTP等)应用,例如HTTP服务(HTTP协议)、长连接Socket Server(自定义协议)等等。我们了解到TCP会拆包发送,因为TCP包的最大大小为65535 Byte,如果HTTP的包太大,就需要拆成多个包发送,这样就需要在接收的时候讲这些包重新组合起来(整个组合过程是对HTTP层透明的,HTTP层不会感知到),我们看看Netty是如何处理的。

测试代码

使用下面这段代码测试,其中比较重要的是 HttpRequestDecoder、HttpObjectAggregator、HttpServerHandler,三个都是ChannelInboundHandler,用于处理接收到的请求,而HttpResponseEncoder则相反,是ChannelOutboundHandler,用来处理返回给客户端的请求。其中boss负责accept客户端的请求,worker负责处理和客户端的数据读写通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class NettyHttpServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(new DefaultThreadFactory("boss"));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(4, new DefaultThreadFactory("worker"));

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 120)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new IdleStateHandler(10, 10, 10, TimeUnit.MINUTES));
channel.pipeline().addLast(new HttpRequestDecoder());
channel.pipeline().addLast(new HttpResponseEncoder());
channel.pipeline().addLast(new HttpObjectAggregator(4 * 1024 * 1024));
channel.pipeline().addLast(new HttpServerHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().get();
} catch (Exception e) {
System.out.println(e);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final String FAVICON_ICO = "/favicon.ico";
private static final AsciiString CONNECTION = AsciiString.cached("Connection");
private static final AsciiString KEEP_ALIVE = AsciiString.cached("keep-alive");
private static final AsciiString CONTENT_TYPE = AsciiString.cached("Content-Type");
private static final AsciiString CONTENT_LENGTH = AsciiString.cached("Content-Length");

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) {
String uri = fullHttpRequest.uri();
if (uri.equals(FAVICON_ICO)) {
return;
}
Object result;
FullHttpResponse response;
try {
byte[] responseBytes = "{\"status\":0}".getBytes();
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(responseBytes));
response.headers().set(CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
} catch (IllegalArgumentException e) {
e.printStackTrace();
byte[] responseBytes = "{\"status\":1}".getBytes();
response = new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(responseBytes));
response.headers().set(CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
}
boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest);
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}

解码器

解码的主要工作在 HttpRequestDecoder、HttpObjectAggregator 中,其中 HttpRequestDecoder 主要负责将Socket接收到的ByteBuffer转换成不同的自定义成分,例如: HttpMessage(包含method、path、version,以及所有header,基本就是除body之外的数据)、HttpContent(body数据,主要包括 DefaultHttpContent 和 DefaultLastHttpContent)。
HttpObjectDecoder 的 decode 方法,通过状态机的方式处理请求(回答了最开始提出的问题),先读取initial line(包括method、path、version),然后读取headers,然后根据header里的数据来读取body(主要分为chunked和普通body,chunked按照chunk size + chunk content + 空行组成,注意chunk的最后一行size是0),由于每个socket连接都会有一个这样的Handler,因此不存在线程竞争的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (resetRequested) {
resetNow();
}

switch (currentState) {
case SKIP_CONTROL_CHARS:
// Fall-through
case READ_INITIAL: try {
AppendableCharSequence line = lineParser.parse(buffer);
if (line == null) {
return;
}
String[] initialLine = splitInitialLine(line);
if (initialLine.length < 3) {
// Invalid initial line - ignore.
currentState = State.SKIP_CONTROL_CHARS;
return;
}

message = createMessage(initialLine);
currentState = State.READ_HEADER;
// fall-through
} catch (Exception e) {
out.add(invalidMessage(buffer, e));
return;
}
case READ_HEADER: try {
State nextState = readHeaders(buffer);
if (nextState == null) {
return;
}
currentState = nextState;
switch (nextState) {
case SKIP_CONTROL_CHARS:
// fast-path
// No content is expected.
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
case READ_CHUNK_SIZE:
if (!chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
out.add(message);
return;
default:
/**
* <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a
* request does not have either a transfer-encoding or a content-length header then the message body
* length is 0. However for a response the body length is the number of octets received prior to the
* server closing the connection. So we treat this as variable length chunked encoding.
*/
long contentLength = contentLength();
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
}

assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;

out.add(message);

if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
chunkSize = contentLength;
}

// We return here, this forces decode to be called again where we will decode the content
return;
}
} catch (Exception e) {
out.add(invalidMessage(buffer, e));
return;
}
case READ_VARIABLE_LENGTH_CONTENT: {
// Keep reading data as a chunk until the end of connection is reached.
int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
if (toRead > 0) {
ByteBuf content = buffer.readRetainedSlice(toRead);
out.add(new DefaultHttpContent(content));
}
return;
}
case READ_FIXED_LENGTH_CONTENT: {
int readLimit = buffer.readableBytes();

// Check if the buffer is readable first as we use the readable byte count
// to create the HttpChunk. This is needed as otherwise we may end up with
// create an HttpChunk instance that contains an empty buffer and so is
// handled like it is the last HttpChunk.
//
// See https://github.com/netty/netty/issues/433
if (readLimit == 0) {
return;
}

int toRead = Math.min(readLimit, maxChunkSize);
if (toRead > chunkSize) {
toRead = (int) chunkSize;
}
ByteBuf content = buffer.readRetainedSlice(toRead);
chunkSize -= toRead;

if (chunkSize == 0) {
// Read all content.
out.add(new DefaultLastHttpContent(content, validateHeaders));
resetNow();
} else {
out.add(new DefaultHttpContent(content));
}
return;
}
/**
* everything else after this point takes care of reading chunked content. basically, read chunk size,
* read chunk, read and ignore the CRLF and repeat until 0
*/
case READ_CHUNK_SIZE: try {
AppendableCharSequence line = lineParser.parse(buffer);
if (line == null) {
return;
}
int chunkSize = getChunkSize(line.toString());
this.chunkSize = chunkSize;
if (chunkSize == 0) {
currentState = State.READ_CHUNK_FOOTER;
return;
}
currentState = State.READ_CHUNKED_CONTENT;
// fall-through
} catch (Exception e) {
out.add(invalidChunk(buffer, e));
return;
}
case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE;
int toRead = Math.min((int) chunkSize, maxChunkSize);
if (!allowPartialChunks && buffer.readableBytes() < toRead) {
return;
}
toRead = Math.min(toRead, buffer.readableBytes());
if (toRead == 0) {
return;
}
HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
chunkSize -= toRead;

out.add(chunk);

if (chunkSize != 0) {
return;
}
currentState = State.READ_CHUNK_DELIMITER;
// fall-through
}
case READ_CHUNK_DELIMITER: {
final int wIdx = buffer.writerIndex();
int rIdx = buffer.readerIndex();
while (wIdx > rIdx) {
byte next = buffer.getByte(rIdx++);
if (next == HttpConstants.LF) {
currentState = State.READ_CHUNK_SIZE;
break;
}
}
buffer.readerIndex(rIdx);
return;
}
case READ_CHUNK_FOOTER: try {
LastHttpContent trailer = readTrailingHeaders(buffer);
if (trailer == null) {
return;
}
out.add(trailer);
resetNow();
return;
} catch (Exception e) {
out.add(invalidChunk(buffer, e));
return;
}
case BAD_MESSAGE: {
// Keep discarding until disconnection.
buffer.skipBytes(buffer.readableBytes());
break;
}
case UPGRADED: {
int readableBytes = buffer.readableBytes();
if (readableBytes > 0) {
// Keep on consuming as otherwise we may trigger an DecoderException,
// other handler will replace this codec with the upgraded protocol codec to
// take the traffic over at some point then.
// See https://github.com/netty/netty/issues/2173
out.add(buffer.readBytes(readableBytes));
}
break;
}
default:
break;
}
}

这里注意了在 HttpRequestDecoder 阶段是把chunked的请求处理成普通的带content-length的请求,方便后续处理

编码器

编码的主要逻辑放在了 HttpResponseEncoder 中,主要就是将 FullHttpResponse 对象转化成ByteBuf,按照协议的格式,第一行是 version、status,之后是 headers,然后的body,body会按照是否是chunked来进行编码。这些数据在 ByteBuf 中,最终通过socket发回客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
ByteBuf buf = null;
if (msg instanceof HttpMessage) {
if (state != ST_INIT) {
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
+ ", state: " + state);
}

@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
H m = (H) msg;

buf = ctx.alloc().buffer((int) headersEncodedSizeAccumulator);
// Encode the message.
encodeInitialLine(buf, m);
state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;

sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);

encodeHeaders(m.headers(), buf);
ByteBufUtil.writeShortBE(buf, CRLF_SHORT);

headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
}

// Bypass the encoder in case of an empty buffer, so that the following idiom works:
//
// ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//
// See https://github.com/netty/netty/issues/2983 for more information.
if (msg instanceof ByteBuf) {
final ByteBuf potentialEmptyBuf = (ByteBuf) msg;
if (!potentialEmptyBuf.isReadable()) {
out.add(potentialEmptyBuf.retain());
return;
}
}

if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
switch (state) {
case ST_INIT:
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
+ ", state: " + state);
case ST_CONTENT_NON_CHUNK:
final long contentLength = contentLength(msg);
if (contentLength > 0) {
if (buf != null && buf.writableBytes() >= contentLength && msg instanceof HttpContent) {
// merge into other buffer for performance reasons
buf.writeBytes(((HttpContent) msg).content());
out.add(buf);
} else {
if (buf != null) {
out.add(buf);
}
out.add(encodeAndRetain(msg));
}

if (msg instanceof LastHttpContent) {
state = ST_INIT;
}

break;
}

// fall-through!
case ST_CONTENT_ALWAYS_EMPTY:

if (buf != null) {
// We allocated a buffer so add it now.
out.add(buf);
} else {
// Need to produce some output otherwise an
// IllegalStateException will be thrown as we did not write anything
// Its ok to just write an EMPTY_BUFFER as if there are reference count issues these will be
// propagated as the caller of the encode(...) method will release the original
// buffer.
// Writing an empty buffer will not actually write anything on the wire, so if there is a user
// error with msg it will not be visible externally
out.add(Unpooled.EMPTY_BUFFER);
}

break;
case ST_CONTENT_CHUNK:
if (buf != null) {
// We allocated a buffer so add it now.
out.add(buf);
}
encodeChunkedContent(ctx, msg, contentLength(msg), out);

break;
default:
throw new Error();
}

if (msg instanceof LastHttpContent) {
state = ST_INIT;
}
} else if (buf != null) {
out.add(buf);
}
}

总结

解码的过程和之前想的不一样,但是本质上是利用了header中带了后续body的size来进行解析的,有点像TCP包的封装方式

Reference

  1. 几十行代码基于Netty搭建一个 HTTP Server