// BaseContinuationImpl publicfinaloverridefunresumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { // 调用上面反编译代码的 invokeSuspend 函数 val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
Concurrency Level: 100 Time taken for tests: 13.406 seconds Complete requests: 10000 Failed requests: 0 Keep-Alive requests: 10000 Total transferred: 650000 bytes HTML transferred: 30000 bytes Requests per second: 745.94 [#/sec] (mean) Time per request: 134.058 [ms] (mean) Time per request: 1.341 [ms] (mean, across all concurrent requests) Transfer rate: 47.35 [Kbytes/sec] received
Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 2.4 0 28 Processing: 106 132 13.0 129 199 Waiting: 106 132 13.0 129 199 Total: 106 132 13.7 129 199
Percentage of the requests served within a certain time (ms) 50% 129 66% 134 75% 139 80% 142 90% 150 95% 156 98% 167 99% 186 100% 199 (longest request)
Concurrency Level: 100 Time taken for tests: 45.134 seconds Complete requests: 10000 Failed requests: 17 (Connect: 0, Receive: 0, Length: 17, Exceptions: 0) Non-2xx responses: 17 Keep-Alive requests: 9960 Total transferred: 1649167 bytes HTML transferred: 39078 bytes Requests per second: 221.56 [#/sec] (mean) Time per request: 451.336 [ms] (mean) Time per request: 4.513 [ms] (mean, across all concurrent requests) Transfer rate: 35.68 [Kbytes/sec] received
Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 3.4 0 39 Processing: 116 421 2375.9 177 26593 Waiting: 116 421 2375.9 177 26593 Total: 116 421 2375.9 177 26593
Percentage of the requests served within a certain time (ms) 50% 177 66% 187 75% 194 80% 198 90% 210 95% 223 98% 246 99% 13434 100% 26593 (longest request)
for (peer = rrp->peers->peer, i = 0; i < rrp->peers->init_number; i++) { peer = peer->next; }
flag = 1; for (i = rrp->peers->init_number; i != rrp->peers->init_number || flag; i = (i + 1) % rrp->peers->number, peer = peer->next ? peer->next : rrp->peers->peer) { flag = 0;
#else for (peer = rrp->peers->peer, i = 0; peer; peer = peer->next, i++) { #endif n = i / (8 * sizeof(uintptr_t)); m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
if (rrp->tried[n] & m) { continue; }
if (peer->down) { continue; }
#if (NGX_HTTP_UPSTREAM_CHECK) if (ngx_http_upstream_check_peer_down(peer->check_index)) { continue; } #endif
if (peer->max_fails && peer->fails >= peer->max_fails && now - peer->checked <= peer->fail_timeout) { continue; }
if (peer->max_conns && peer->conns >= peer->max_conns) { continue; }
// 这里加的是effective_weight peer->current_weight += peer->effective_weight; total += peer->effective_weight;
if (peer->effective_weight < peer->weight) { peer->effective_weight++; }
// 选current_weight最大的 if (best == NULL || peer->current_weight > best->current_weight) { best = peer; p = i; } }
if (best == NULL) { returnNULL; }
rrp->current = best;
n = p / (8 * sizeof(uintptr_t)); m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
/* List of References waiting to be enqueued. The collector adds * References to this list, while the Reference-handler thread removes * them. This list is protected by the above lock object. The * list uses the discovered field to link its elements. */ privatestatic Reference<Object> pending = null;
switch (currentState) { case SKIP_CONTROL_CHARS: // Fall-through case READ_INITIAL: try { AppendableCharSequence line = lineParser.parse(buffer); if (line == null) { return; } String[] initialLine = splitInitialLine(line); if (initialLine.length < 3) { // Invalid initial line - ignore. currentState = State.SKIP_CONTROL_CHARS; return; }
message = createMessage(initialLine); currentState = State.READ_HEADER; // fall-through } catch (Exception e) { out.add(invalidMessage(buffer, e)); return; } case READ_HEADER: try { State nextState = readHeaders(buffer); if (nextState == null) { return; } currentState = nextState; switch (nextState) { case SKIP_CONTROL_CHARS: // fast-path // No content is expected. out.add(message); out.add(LastHttpContent.EMPTY_LAST_CONTENT); resetNow(); return; case READ_CHUNK_SIZE: if (!chunkedSupported) { thrownew IllegalArgumentException("Chunked messages not supported"); } // Chunked encoding - generate HttpMessage first. HttpChunks will follow. out.add(message); return; default: /** * <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that if a * request does not have either a transfer-encoding or a content-length header then the message body * length is 0. However for a response the body length is the number of octets received prior to the * server closing the connection. So we treat this as variable length chunked encoding. */ long contentLength = contentLength(); if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) { out.add(message); out.add(LastHttpContent.EMPTY_LAST_CONTENT); resetNow(); return; }
if (nextState == State.READ_FIXED_LENGTH_CONTENT) { // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk. chunkSize = contentLength; }
// We return here, this forces decode to be called again where we will decode the content return; } } catch (Exception e) { out.add(invalidMessage(buffer, e)); return; } case READ_VARIABLE_LENGTH_CONTENT: { // Keep reading data as a chunk until the end of connection is reached. int toRead = Math.min(buffer.readableBytes(), maxChunkSize); if (toRead > 0) { ByteBuf content = buffer.readRetainedSlice(toRead); out.add(new DefaultHttpContent(content)); } return; } case READ_FIXED_LENGTH_CONTENT: { int readLimit = buffer.readableBytes();
// Check if the buffer is readable first as we use the readable byte count // to create the HttpChunk. This is needed as otherwise we may end up with // create an HttpChunk instance that contains an empty buffer and so is // handled like it is the last HttpChunk. // // See https://github.com/netty/netty/issues/433 if (readLimit == 0) { return; }
if (chunkSize == 0) { // Read all content. out.add(new DefaultLastHttpContent(content, validateHeaders)); resetNow(); } else { out.add(new DefaultHttpContent(content)); } return; } /** * everything else after this point takes care of reading chunked content. basically, read chunk size, * read chunk, read and ignore the CRLF and repeat until 0 */ case READ_CHUNK_SIZE: try { AppendableCharSequence line = lineParser.parse(buffer); if (line == null) { return; } int chunkSize = getChunkSize(line.toString()); this.chunkSize = chunkSize; if (chunkSize == 0) { currentState = State.READ_CHUNK_FOOTER; return; } currentState = State.READ_CHUNKED_CONTENT; // fall-through } catch (Exception e) { out.add(invalidChunk(buffer, e)); return; } case READ_CHUNKED_CONTENT: { assert chunkSize <= Integer.MAX_VALUE; int toRead = Math.min((int) chunkSize, maxChunkSize); if (!allowPartialChunks && buffer.readableBytes() < toRead) { return; } toRead = Math.min(toRead, buffer.readableBytes()); if (toRead == 0) { return; } HttpContent chunk = new DefaultHttpContent(buffer.readRetainedSlice(toRead)); chunkSize -= toRead;
out.add(chunk);
if (chunkSize != 0) { return; } currentState = State.READ_CHUNK_DELIMITER; // fall-through } case READ_CHUNK_DELIMITER: { finalint wIdx = buffer.writerIndex(); int rIdx = buffer.readerIndex(); while (wIdx > rIdx) { byte next = buffer.getByte(rIdx++); if (next == HttpConstants.LF) { currentState = State.READ_CHUNK_SIZE; break; } } buffer.readerIndex(rIdx); return; } case READ_CHUNK_FOOTER: try { LastHttpContent trailer = readTrailingHeaders(buffer); if (trailer == null) { return; } out.add(trailer); resetNow(); return; } catch (Exception e) { out.add(invalidChunk(buffer, e)); return; } case BAD_MESSAGE: { // Keep discarding until disconnection. buffer.skipBytes(buffer.readableBytes()); break; } case UPGRADED: { int readableBytes = buffer.readableBytes(); if (readableBytes > 0) { // Keep on consuming as otherwise we may trigger an DecoderException, // other handler will replace this codec with the upgraded protocol codec to // take the traffic over at some point then. // See https://github.com/netty/netty/issues/2173 out.add(buffer.readBytes(readableBytes)); } break; } default: break; } }
// Bypass the encoder in case of an empty buffer, so that the following idiom works: // // ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); // // See https://github.com/netty/netty/issues/2983 for more information. if (msg instanceof ByteBuf) { final ByteBuf potentialEmptyBuf = (ByteBuf) msg; if (!potentialEmptyBuf.isReadable()) { out.add(potentialEmptyBuf.retain()); return; } }
if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) { switch (state) { case ST_INIT: thrownew IllegalStateException("unexpected message type: " + StringUtil.simpleClassName(msg) + ", state: " + state); case ST_CONTENT_NON_CHUNK: finallong contentLength = contentLength(msg); if (contentLength > 0) { if (buf != null && buf.writableBytes() >= contentLength && msg instanceof HttpContent) { // merge into other buffer for performance reasons buf.writeBytes(((HttpContent) msg).content()); out.add(buf); } else { if (buf != null) { out.add(buf); } out.add(encodeAndRetain(msg)); }
if (msg instanceof LastHttpContent) { state = ST_INIT; }
break; }
// fall-through! case ST_CONTENT_ALWAYS_EMPTY:
if (buf != null) { // We allocated a buffer so add it now. out.add(buf); } else { // Need to produce some output otherwise an // IllegalStateException will be thrown as we did not write anything // Its ok to just write an EMPTY_BUFFER as if there are reference count issues these will be // propagated as the caller of the encode(...) method will release the original // buffer. // Writing an empty buffer will not actually write anything on the wire, so if there is a user // error with msg it will not be visible externally out.add(Unpooled.EMPTY_BUFFER); }
break; case ST_CONTENT_CHUNK: if (buf != null) { // We allocated a buffer so add it now. out.add(buf); } encodeChunkedContent(ctx, msg, contentLength(msg), out);
break; default: thrownew Error(); }
if (msg instanceof LastHttpContent) { state = ST_INIT; } } elseif (buf != null) { out.add(buf); } }