0%

Netty HTTP 编解码简析

简介

Netty是一个高性能的NIO框架,理论上说它工作在四层(传输层/TCP、UDP等),我们可以在上面构建各种类型的七层(应用层/HTTP等)应用,例如HTTP服务(HTTP协议)、长连接Socket Server(自定义协议)等等。我们了解到TCP会拆包发送,因为TCP包的最大大小为65535 Byte,如果HTTP的包太大,就需要拆成多个包发送,这样就需要在接收的时候讲这些包重新组合起来(整个组合过程是对HTTP层透明的,HTTP层不会感知到),我们看看Netty是如何处理的。

测试代码

使用下面这段代码测试,其中比较重要的是 HttpRequestDecoder、HttpObjectAggregator、HttpServerHandler,三个都是ChannelInboundHandler,用于处理接收到的请求,而HttpResponseEncoder则相反,是ChannelOutboundHandler,用来处理返回给客户端的请求。其中boss负责accept客户端的请求,worker负责处理和客户端的数据读写通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class NettyHttpServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(new DefaultThreadFactory("boss"));
NioEventLoopGroup workerGroup = new NioEventLoopGroup(4, new DefaultThreadFactory("worker"));

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 120)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new IdleStateHandler(10, 10, 10, TimeUnit.MINUTES));
channel.pipeline().addLast(new HttpRequestDecoder());
channel.pipeline().addLast(new HttpResponseEncoder());
channel.pipeline().addLast(new HttpObjectAggregator(4 * 1024 * 1024));
channel.pipeline().addLast(new HttpServerHandler());
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().get();
} catch (Exception e) {
System.out.println(e);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final String FAVICON_ICO = "/favicon.ico";
private static final AsciiString CONNECTION = AsciiString.cached("Connection");
private static final AsciiString KEEP_ALIVE = AsciiString.cached("keep-alive");
private static final AsciiString CONTENT_TYPE = AsciiString.cached("Content-Type");
private static final AsciiString CONTENT_LENGTH = AsciiString.cached("Content-Length");

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) {
String uri = fullHttpRequest.uri();
if (uri.equals(FAVICON_ICO)) {
return;
}
Object result;
FullHttpResponse response;
try {
byte[] responseBytes = "{\"status\":0}".getBytes();
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(responseBytes));
response.headers().set(CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
} catch (IllegalArgumentException e) {
e.printStackTrace();
byte[] responseBytes = "{\"status\":1}".getBytes();
response = new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(responseBytes));
response.headers().set(CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
}
boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest);
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, KEEP_ALIVE);
ctx.write(response);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}

解码器

解码的主要工作在 HttpRequestDecoder、HttpObjectAggregator 中,其中 HttpRequestDecoder 主要负责将Socket接收到的ByteBuffer转换成不同的自定义成分,例如: HttpMessage(包含method、path、version,以及所有header,基本就是除body之外的数据)、HttpContent(body数据,主要包括 DefaultHttpContent 和 DefaultLastHttpContent)。
HttpObjectDecoder 的 decode 方法,通过状态机的方式处理请求(回答了最开始提出的问题),先读取initial line(包括method、path、version),然后读取headers,然后根据header里的数据来读取body(主要分为chunked和普通body,chunked按照chunk size + chunk content + 空行组成,注意chunk的最后一行size是0),由于每个socket连接都会有一个这样的Handler,因此不存在线程竞争的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
if (resetRequested) {
resetNow();
}

switch (currentState) {
case SKIP_CONTROL_CHARS:
// Fall-through
case READ_INITIAL: try {
AppendableCharSequence line = lineParser.parse(buffer);
if (line == null) {
return;
}
String[] initialLine = splitInitialLine(line);
if (initialLine.length < 3) {
// Invalid initial line - ignore.
currentState = State.SKIP_CONTROL_CHARS;
return;
}

message = createMessage(initialLine);
currentState = State.READ_HEADER;
// fall-through
} catch (Exception e) {
out.add(invalidMessage(buffer, e));
return;
}
case READ_HEADER: try {
State nextState = readHeaders(buffer);
if (nextState == null) {
return;
}
currentState = nextState;
switch (nextState) {
case SKIP_CONTROL_CHARS:
// fast-path
// No content is expected.
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
case READ_CHUNK_SIZE:
if (!chunkedSupported) {
throw new IllegalArgumentException("Chunked messages not supported");
}
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
out.add(message);
return;
default:
/**
* <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a
* request does not have either a transfer-encoding or a content-length header then the message body
* length is 0. However for a response the body length is the number of octets received prior to the
* server closing the connection. So we treat this as variable length chunked encoding.
*/
long contentLength = contentLength();
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
out.add(message);
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
return;
}

assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;

out.add(message);

if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
chunkSize = contentLength;
}

// We return here, this forces decode to be called again where we will decode the content
return;
}
} catch (Exception e) {
out.add(invalidMessage(buffer, e));
return;
}
case READ_VARIABLE_LENGTH_CONTENT: {
// Keep reading data as a chunk until the end of connection is reached.
int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
if (toRead > 0) {
ByteBuf content = buffer.readRetainedSlice(toRead);
out.add(new DefaultHttpContent(content));
}
return;
}
case READ_FIXED_LENGTH_CONTENT: {
int readLimit = buffer.readableBytes();

// Check if the buffer is readable first as we use the readable byte count
// to create the HttpChunk. This is needed as otherwise we may end up with
// create an HttpChunk instance that contains an empty buffer and so is
// handled like it is the last HttpChunk.
//
// See https://github.com/netty/netty/issues/433
if (readLimit == 0) {
return;
}

int toRead = Math.min(readLimit, maxChunkSize);
if (toRead > chunkSize) {
toRead = (int) chunkSize;
}
ByteBuf content = buffer.readRetainedSlice(toRead);
chunkSize -= toRead;

if (chunkSize == 0) {
// Read all content.
out.add(new DefaultLastHttpContent(content, validateHeaders));
resetNow();
} else {
out.add(new DefaultHttpContent(content));
}
return;
}
/**
* everything else after this point takes care of reading chunked content. basically, read chunk size,
* read chunk, read and ignore the CRLF and repeat until 0
*/
case READ_CHUNK_SIZE: try {
AppendableCharSequence line = lineParser.parse(buffer);
if (line == null) {
return;
}
int chunkSize = getChunkSize(line.toString());
this.chunkSize = chunkSize;
if (chunkSize == 0) {
currentState = State.READ_CHUNK_FOOTER;
return;
}
currentState = State.READ_CHUNKED_CONTENT;
// fall-through
} catch (Exception e) {
out.add(invalidChunk(buffer, e));
return;
}
case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE;
int toRead = Math.min((int) chunkSize, maxChunkSize);
if (!allowPartialChunks && buffer.readableBytes() < toRead) {
return;
}
toRead = Math.min(toRead, buffer.readableBytes());
if (toRead == 0) {
return;
}
HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead));
chunkSize -= toRead;

out.add(chunk);

if (chunkSize != 0) {
return;
}
currentState = State.READ_CHUNK_DELIMITER;
// fall-through
}
case READ_CHUNK_DELIMITER: {
final int wIdx = buffer.writerIndex();
int rIdx = buffer.readerIndex();
while (wIdx > rIdx) {
byte next = buffer.getByte(rIdx++);
if (next == HttpConstants.LF) {
currentState = State.READ_CHUNK_SIZE;
break;
}
}
buffer.readerIndex(rIdx);
return;
}
case READ_CHUNK_FOOTER: try {
LastHttpContent trailer = readTrailingHeaders(buffer);
if (trailer == null) {
return;
}
out.add(trailer);
resetNow();
return;
} catch (Exception e) {
out.add(invalidChunk(buffer, e));
return;
}
case BAD_MESSAGE: {
// Keep discarding until disconnection.
buffer.skipBytes(buffer.readableBytes());
break;
}
case UPGRADED: {
int readableBytes = buffer.readableBytes();
if (readableBytes > 0) {
// Keep on consuming as otherwise we may trigger an DecoderException,
// other handler will replace this codec with the upgraded protocol codec to
// take the traffic over at some point then.
// See https://github.com/netty/netty/issues/2173
out.add(buffer.readBytes(readableBytes));
}
break;
}
default:
break;
}
}

这里注意了在 HttpRequestDecoder 阶段是把chunked的请求处理成普通的带content-length的请求,方便后续处理

编码器

编码的主要逻辑放在了 HttpResponseEncoder 中,主要就是将 FullHttpResponse 对象转化成ByteBuf,按照协议的格式,第一行是 version、status,之后是 headers,然后的body,body会按照是否是chunked来进行编码。这些数据在 ByteBuf 中,最终通过socket发回客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
ByteBuf buf = null;
if (msg instanceof HttpMessage) {
if (state != ST_INIT) {
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
+ ", state: " + state);
}

@SuppressWarnings({ "unchecked", "CastConflictsWithInstanceof" })
H m = (H) msg;

buf = ctx.alloc().buffer((int) headersEncodedSizeAccumulator);
// Encode the message.
encodeInitialLine(buf, m);
state = isContentAlwaysEmpty(m) ? ST_CONTENT_ALWAYS_EMPTY :
HttpUtil.isTransferEncodingChunked(m) ? ST_CONTENT_CHUNK : ST_CONTENT_NON_CHUNK;

sanitizeHeadersBeforeEncode(m, state == ST_CONTENT_ALWAYS_EMPTY);

encodeHeaders(m.headers(), buf);
ByteBufUtil.writeShortBE(buf, CRLF_SHORT);

headersEncodedSizeAccumulator = HEADERS_WEIGHT_NEW * padSizeForAccumulation(buf.readableBytes()) +
HEADERS_WEIGHT_HISTORICAL * headersEncodedSizeAccumulator;
}

// Bypass the encoder in case of an empty buffer, so that the following idiom works:
//
// ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//
// See https://github.com/netty/netty/issues/2983 for more information.
if (msg instanceof ByteBuf) {
final ByteBuf potentialEmptyBuf = (ByteBuf) msg;
if (!potentialEmptyBuf.isReadable()) {
out.add(potentialEmptyBuf.retain());
return;
}
}

if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
switch (state) {
case ST_INIT:
throw new IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg)
+ ", state: " + state);
case ST_CONTENT_NON_CHUNK:
final long contentLength = contentLength(msg);
if (contentLength > 0) {
if (buf != null && buf.writableBytes() >= contentLength && msg instanceof HttpContent) {
// merge into other buffer for performance reasons
buf.writeBytes(((HttpContent) msg).content());
out.add(buf);
} else {
if (buf != null) {
out.add(buf);
}
out.add(encodeAndRetain(msg));
}

if (msg instanceof LastHttpContent) {
state = ST_INIT;
}

break;
}

// fall-through!
case ST_CONTENT_ALWAYS_EMPTY:

if (buf != null) {
// We allocated a buffer so add it now.
out.add(buf);
} else {
// Need to produce some output otherwise an
// IllegalStateException will be thrown as we did not write anything
// Its ok to just write an EMPTY_BUFFER as if there are reference count issues these will be
// propagated as the caller of the encode(...) method will release the original
// buffer.
// Writing an empty buffer will not actually write anything on the wire, so if there is a user
// error with msg it will not be visible externally
out.add(Unpooled.EMPTY_BUFFER);
}

break;
case ST_CONTENT_CHUNK:
if (buf != null) {
// We allocated a buffer so add it now.
out.add(buf);
}
encodeChunkedContent(ctx, msg, contentLength(msg), out);

break;
default:
throw new Error();
}

if (msg instanceof LastHttpContent) {
state = ST_INIT;
}
} else if (buf != null) {
out.add(buf);
}
}

总结

解码的过程和之前想的不一样,但是本质上是利用了header中带了后续body的size来进行解析的,有点像TCP包的封装方式

Reference

  1. 几十行代码基于Netty搭建一个 HTTP Server
如果您觉得这些内容对您有帮助,你可以赞助我以提高站点的文章质量