local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end
// Start running the query that prints the running counts to the console StreamingQuery query = windowedCounts.writeStream() .outputMode("complete") .format("console") .option("truncate", false) .start();
// BaseContinuationImpl publicfinaloverridefunresumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { // 调用上面反编译代码的 invokeSuspend 函数 val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
Concurrency Level: 100 Time taken for tests: 13.406 seconds Complete requests: 10000 Failed requests: 0 Keep-Alive requests: 10000 Total transferred: 650000 bytes HTML transferred: 30000 bytes Requests per second: 745.94 [#/sec] (mean) Time per request: 134.058 [ms] (mean) Time per request: 1.341 [ms] (mean, across all concurrent requests) Transfer rate: 47.35 [Kbytes/sec] received
Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 2.4 0 28 Processing: 106 132 13.0 129 199 Waiting: 106 132 13.0 129 199 Total: 106 132 13.7 129 199
Percentage of the requests served within a certain time (ms) 50% 129 66% 134 75% 139 80% 142 90% 150 95% 156 98% 167 99% 186 100% 199 (longest request)
Concurrency Level: 100 Time taken for tests: 45.134 seconds Complete requests: 10000 Failed requests: 17 (Connect: 0, Receive: 0, Length: 17, Exceptions: 0) Non-2xx responses: 17 Keep-Alive requests: 9960 Total transferred: 1649167 bytes HTML transferred: 39078 bytes Requests per second: 221.56 [#/sec] (mean) Time per request: 451.336 [ms] (mean) Time per request: 4.513 [ms] (mean, across all concurrent requests) Transfer rate: 35.68 [Kbytes/sec] received
Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 3.4 0 39 Processing: 116 421 2375.9 177 26593 Waiting: 116 421 2375.9 177 26593 Total: 116 421 2375.9 177 26593
Percentage of the requests served within a certain time (ms) 50% 177 66% 187 75% 194 80% 198 90% 210 95% 223 98% 246 99% 13434 100% 26593 (longest request)
for (peer = rrp->peers->peer, i = 0; i < rrp->peers->init_number; i++) { peer = peer->next; }
flag = 1; for (i = rrp->peers->init_number; i != rrp->peers->init_number || flag; i = (i + 1) % rrp->peers->number, peer = peer->next ? peer->next : rrp->peers->peer) { flag = 0;
#else for (peer = rrp->peers->peer, i = 0; peer; peer = peer->next, i++) { #endif n = i / (8 * sizeof(uintptr_t)); m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
if (rrp->tried[n] & m) { continue; }
if (peer->down) { continue; }
#if (NGX_HTTP_UPSTREAM_CHECK) if (ngx_http_upstream_check_peer_down(peer->check_index)) { continue; } #endif
if (peer->max_fails && peer->fails >= peer->max_fails && now - peer->checked <= peer->fail_timeout) { continue; }
if (peer->max_conns && peer->conns >= peer->max_conns) { continue; }
// 这里加的是effective_weight peer->current_weight += peer->effective_weight; total += peer->effective_weight;
if (peer->effective_weight < peer->weight) { peer->effective_weight++; }
// 选current_weight最大的 if (best == NULL || peer->current_weight > best->current_weight) { best = peer; p = i; } }
if (best == NULL) { returnNULL; }
rrp->current = best;
n = p / (8 * sizeof(uintptr_t)); m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));