0%

背景介绍

在编程语言中例如C语言中使用malloc/free来分配和回收内存,CPP中使用new/delete来分配和回收内存,本质上是malloc/free的包装,而Java虚拟机使用C/CPP,因此并无太大差异。
在不特殊指定的情况下,程序在编译时会链接到glibc默认的malloc实现,但是由于计算机硬件的发展,这个malloc已经不太能适合当前多核CPU的场景,因此诞生出了一些新的malloc实现例如FreeBSD使用的jemalloc和Golang使用的tcmalloc等(注意到netty的直接内存的分配也是直接参考了jemalloc),这里我们简单介绍下jemalloc的代码实现(主要基于4.0.3版本)。
注:jemalloc据说是有较高的内存分配并发,同时在单cpu时性能也能和glibc的malloc持平

简要介绍

xxmalloc这一类库函数,本质上是对系统调用(System Call)的包装,glibc的malloc会在申请小内存的时候使用brk增加堆的上界,在申请大内存的时候使用mmap从空闲堆分配,而jemalloc在默认情况下全部使用mmap分配内存。
brk and mmap

环境搭建

通过下面的命令安装编译出jemalloc的库,并将代码链接到生成的库,之后就可以通过gdb对jemalloc进行debug,通过debug可以对整体代码进行较为全面的分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 环境 Centos 8,已预装gcc/g++
yum install autoconf automake gdb -y
wget https://github.com/jemalloc/jemalloc/archive/refs/tags/4.0.3.tar.gz
tar zxvf 4.0.3.tar.gz
cd jemalloc-4.0.3
./autogen.sh --enable-debug
make
# 为了方便,这里静态链接到了jemalloc,实际使用中一般是动态链接
gcc -O0 -g -ggdb -Iinclude dummy.c lib/libjemalloc.a -lpthread -ldl -o dummy
# 动态链接使用下面的方式,需要先将so拷贝到lib目录
# make install
# gcc -O0 -g -ggdb dummy.c -ljemalloc -lpthread -ldl -o dummy
# 动态链接如果提示找不到so文件,就需要添加安装目录到 LD_LIBRARY_PATH
# LD_LIBRARY_PATH=/usr/local/lib
# export LD_LIBRARY_PATH
# source ~/.bash_profile
gdb ./dummy

如果是动态链接,可以使用ldd查看动态链接来验证,dummy.c可以用下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <stdlib.h>
#include <jemalloc/jemalloc.h>

int main() {
printf("start here --------------\n");
for (int i = 0; i < 5; ++i) {
char * s = malloc(100);
s[0] = 'h';
printf("allocate small %p\n", s);
}
printf("finish small allocator\n");
for (int i = 0; i < 2; ++i) {
char * s = malloc(5000000);
printf("allocate large %p\n", s);
}
printf("finish here --------------\n");
return 0;
}

基本概念

size class

首先,jemalloc将内存分为不同的size class,在申请内存的时候会对size向上取整,例如申请49Byte内存,会对应到64的size,实际去申请64Byte,这样是为了方便分配内存,减少外部碎片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
+---------+---------+--------------------------------------+
|Category | Spacing | Size |
+---------+---------+--------------------------------------+
| | lg | [8] |
| +---------+--------------------------------------+
| | 16 | [16, 32, 48, 64, 80, 96, 112, 128] |
| +---------+--------------------------------------+
| | 32 | [160, 192, 224, 256] |
| +---------+--------------------------------------+
| | 64 | [320, 384, 448, 512] |
| +---------+--------------------------------------+
|Small | 128 | [640, 768, 896, 1024] |
| +---------+--------------------------------------+
| | 256 | [1280, 1536, 1792, 2048] |
| +---------+--------------------------------------+
| | 512 | [2560, 3072, 3584, 4096] |
| +---------+--------------------------------------+
| | 1 KiB | [5 KiB, 6 KiB, 7 KiB, 8 KiB] |
| +---------+--------------------------------------+
| | 2 KiB | [10 KiB, 12 KiB, 14 KiB] |
+---------+---------+--------------------------------------+
| | 2 KiB | [16 KiB] |
| +---------+--------------------------------------+
| | 4 KiB | [20 KiB, 24 KiB, 28 KiB, 32 KiB] |
| +---------+--------------------------------------+
| | 8 KiB | [40 KiB, 48 KiB, 54 KiB, 64 KiB] |
| +---------+--------------------------------------+
| | 16 KiB | [80 KiB, 96 KiB, 112 KiB, 128 KiB] |
|Large +---------+--------------------------------------+
| | 32 KiB | [160 KiB, 192 KiB, 224 KiB, 256 KiB] |
| +---------+--------------------------------------+
| | 64 KiB | [320 KiB, 384 KiB, 448 KiB, 512 KiB] |
| +---------+--------------------------------------+
| | 128 KiB | [640 KiB, 768 KiB, 896 KiB, 1 MiB] |
| +---------+--------------------------------------+
| | 256 KiB | [1280 KiB, 1536 KiB, 1792 KiB] |
+---------+---------+--------------------------------------+
| | 256 KiB | [2 MiB] |
| +---------+--------------------------------------+
| | 512 KiB | [2560 KiB, 3 MiB, 3584 KiB, 4 MiB] |
| +---------+--------------------------------------+
| | 1 MiB | [5 MiB, 6 MiB, 7 MiB, 8 MiB] |
| +---------+--------------------------------------+
|Huge | 2 MiB | [10 MiB, 12 MiB, 14 MiB, 16 MiB] |
| +---------+--------------------------------------+
| | 4 MiB | [20 MiB, 24 MiB, 28 MiB, 32 MiB] |
| +---------+--------------------------------------+
| | 8 MiB | [40 MiB, 48 MiB, 56 MiB, 64 MiB] |
| +---------+--------------------------------------+
| | ... | ... |
+---------+---------+--------------------------------------+

其中small是小于14KB的,Large在 14KB到1792KB之间,Huge为超过1792KB的内存申请。

chunk

jemalloc内存的分配以chunk为基本单位,对于 small和large size,chunk大小一般是2M,对于huge size,chunk大小是申请内存大小,同时是4M的倍数。
2M的chunk一共512个page,一个page 4KB,chunk用13个page存储了自己的信息,剩余499 page 用来使用,使用的时候一般是分配一个run出来,run的大小是page的整数倍,run再均匀且分成固定的大小来使用,chunk、run、region的对应关系大致如下:
chunk and run region
上图中的region其实只是在分配 small 和 large 内存的时候使用

arena

上面说的chunk实际上是被arena管理的,chunk中有一个extent_node_t结构体字段指向了arena,同时chunk中的空闲run地址放在arena的runs_avail这个红黑树中。
考虑到快速分配内存的需要,为日常使用最多的内存大小范围small类型设置了大小从 8Byte 到 14KB 39个bin,bin里面有runcur,方便在需要的时候直接分配内存,arena和chunk、run、bin的关系如下:
arena
每一个bin对应的一个bin_info里会有region size和run size,值得注意的是 region size不一样,run size可能一样,因为不同的run的region count是不一样的,run size一样其实可以方便内存申请和内存回收复用。

tcache和tsd

上述过程,如果要分配个small size的内存,要先从arena拿到bin,然后从其中拿到run,再去run里面拿空闲region,为了加快这个过程,jemalloc增加了tcache,里面按照size class存储了tbin,tbin里面有avail直接存储了可以使用的region,减少获取内存的时间。
另外,每个线程缓存(类似Java Thread Locale)分配了一个tsd用来存储线程的arena和tcache等数据,方便分配内存的时候直接获取,arena绑定到线程,这也是jemalloc和tcmalloc的区别,后者是维护arena的池子,线程没有绑定到固定的arena。
上述说到的内容的整体结构如下
overall

值得注意的是,arena 等基础的内部结构体会走 base 分配,这里暂时不介绍

内存分配

从线程角度看,内存分配经历了图中的过程

上图从代码上看,malloc对应 jemalloc.c 的 je_malloc 函数,接着通过 tsd_fetch() 获取线程缓存的 tsd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// tsd_get 要走 malloc_tsd_funcs,这个东西是个宏定义
malloc_tsd_funcs(JEMALLOC_ALWAYS_INLINE, , tsd_t, tsd_initializer, tsd_cleanup)

JEMALLOC_ALWAYS_INLINE tsd_t *
tsd_fetch(void)
{
// 详见下面的 malloc_tsd_funcs 定义
tsd_t *tsd = tsd_get();

if (unlikely(tsd->state != tsd_state_nominal)) {
if (tsd->state == tsd_state_uninitialized) {
tsd->state = tsd_state_nominal;
/* Trigger cleanup handler registration. */
tsd_set(tsd);
} else if (tsd->state == tsd_state_purgatory) {
tsd->state = tsd_state_reincarnated;
tsd_set(tsd);
} else
assert(tsd->state == tsd_state_reincarnated);
}

return (tsd);
}

#define malloc_tsd_funcs(a_attr, a_name, a_type, a_initializer, \
a_cleanup)
/* Get/set. */ \
a_attr a_type * \
a_name##tsd_get(void) \
{ \
\
assert(a_name##tsd_booted); \
return (&a_name##tsd_tls); \
}

// 这种方式定义线程缓存
extern __thread a_type a_name##tsd_tls; \

#define MALLOC_TSD \
/* O(name, type) */ \
O(tcache, tcache_t *) \
O(thread_allocated, uint64_t) \
O(thread_deallocated, uint64_t) \
O(prof_tdata, prof_tdata_t *) \
O(arena, arena_t *) \
O(arenas_cache, arena_t **) \
O(narenas_cache, unsigned) \
O(arenas_cache_bypass, bool) \
O(tcache_enabled, tcache_enabled_t) \
O(quarantine, quarantine_t *) \

#define TSD_INITIALIZER { \
tsd_state_uninitialized, \
NULL, \
0, \
0, \
NULL, \
NULL, \
NULL, \
0, \
false, \
tcache_enabled_default, \
NULL \
}

struct tsd_s {
tsd_state_t state;
#define O(n, t) \
t n;
MALLOC_TSD
#undef O
};

线程缓存的tsd存储了arena和tcache等数据,tsd会在线程首次分配内存的时候创建并保存到线程缓存。
arena的数量,如果是单核就是一个arena,如果是多核arena数量是核数的4倍。

1
2
3
4
if (ncpus > 1)
opt_narenas = ncpus << 2;
else
opt_narenas = 1;

线程首次创建的时候按照下面规则选择arena

  • 如果arena数量没有达到上面指定的数量,直接创建
  • 选择绑定线程数少的arena
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    /* Slow path, called only by arena_choose(). */
    arena_t *
    arena_choose_hard(tsd_t *tsd)
    {
    arena_t *ret;

    if (narenas_auto > 1) {
    unsigned i, choose, first_null;

    choose = 0;
    first_null = narenas_auto;
    malloc_mutex_lock(&arenas_lock);
    assert(a0get() != NULL);
    for (i = 1; i < narenas_auto; i++) {
    if (arenas[i] != NULL) {
    /*
    * Choose the first arena that has the lowest
    * number of threads assigned to it.
    */
    if (arenas[i]->nthreads <
    arenas[choose]->nthreads)
    choose = i;
    } else if (first_null == narenas_auto) {
    /*
    * Record the index of the first uninitialized
    * arena, in case all extant arenas are in use.
    *
    * NB: It is possible for there to be
    * discontinuities in terms of initialized
    * versus uninitialized arenas, due to the
    * "thread.arena" mallctl.
    */
    first_null = i;
    }
    }

    if (arenas[choose]->nthreads == 0
    || first_null == narenas_auto) {
    /*
    * Use an unloaded arena, or the least loaded arena if
    * all arenas are already initialized.
    */
    ret = arenas[choose];
    } else {
    /* Initialize a new arena. */
    choose = first_null;
    ret = arena_init_locked(choose);
    if (ret == NULL) {
    malloc_mutex_unlock(&arenas_lock);
    return (NULL);
    }
    }
    arena_bind_locked(tsd, choose);
    malloc_mutex_unlock(&arenas_lock);
    } else {
    ret = a0get();
    arena_bind(tsd, 0);
    }

    return (ret);
    }
    之后调用 arena_malloc 来分配内存
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    JEMALLOC_ALWAYS_INLINE void *
    arena_malloc(tsd_t *tsd, arena_t *arena, size_t size, bool zero,
    tcache_t *tcache)
    {

    assert(size != 0);

    arena = arena_choose(tsd, arena);
    if (unlikely(arena == NULL))
    return (NULL);

    if (likely(size <= SMALL_MAXCLASS)) {
    if (likely(tcache != NULL)) {
    return (tcache_alloc_small(tsd, arena, tcache, size,
    zero));
    } else
    return (arena_malloc_small(arena, size, zero));
    } else if (likely(size <= large_maxclass)) {
    /*
    * Initialize tcache after checking size in order to avoid
    * infinite recursion during tcache initialization.
    */
    if (likely(tcache != NULL) && size <= tcache_maxclass) {
    return (tcache_alloc_large(tsd, arena, tcache, size,
    zero));
    } else
    return (arena_malloc_large(arena, size, zero));
    } else
    return (huge_malloc(tsd, arena, size, zero, tcache));
    }
    这里看到对于不同的size参数做了不同的处理,这里也可以映证small和large是有tcache的,而huge没有。

small size 分配

先调用 tcache_alloc_easy 从 tcache 中的去分配,从 tbin->avail 直接拿一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
JEMALLOC_ALWAYS_INLINE void *
tcache_alloc_easy(tcache_bin_t *tbin)
{
void *ret;

if (unlikely(tbin->ncached == 0)) {
tbin->low_water = -1;
return (NULL);
}
tbin->ncached--;
if (unlikely((int)tbin->ncached < tbin->low_water))
tbin->low_water = tbin->ncached;
ret = tbin->avail[tbin->ncached];
return (ret);
}

如果 tcache 中没有就调用 tcache_alloc_small_hard 去调用 arena_bin_nonfull_run_get 先尝试从 bin->runs 中分配一个空闲run,如果没有空闲的会调用 arena_run_alloc_small 从chunk中分配一个(如果没有chunk会分配一个新的chunk),新分配的 run 会被设置到 bin->runcur 方便后续使用,这个过程会将分配出的 region 保存到 tcache_bin_t->avail 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void *
tcache_alloc_small_hard(tsd_t *tsd, arena_t *arena, tcache_t *tcache,
tcache_bin_t *tbin, szind_t binind)
{
void *ret;

arena_tcache_fill_small(arena, tbin, binind, config_prof ?
tcache->prof_accumbytes : 0);
if (config_prof)
tcache->prof_accumbytes = 0;
ret = tcache_alloc_easy(tbin);

return (ret);
}
void
arena_tcache_fill_small(arena_t *arena, tcache_bin_t *tbin, szind_t binind,
uint64_t prof_accumbytes)
{
unsigned i, nfill;
arena_bin_t *bin;

assert(tbin->ncached == 0);

if (config_prof && arena_prof_accum(arena, prof_accumbytes))
prof_idump();
bin = &arena->bins[binind];
malloc_mutex_lock(&bin->lock);
for (i = 0, nfill = (tcache_bin_info[binind].ncached_max >>
tbin->lg_fill_div); i < nfill; i++) {
arena_run_t *run;
void *ptr;
// 如果runcur有,就从当前run去分配region
if ((run = bin->runcur) != NULL && run->nfree > 0)
ptr = arena_run_reg_alloc(run, &arena_bin_info[binind]);
else
// 如果没有run了,先分配个run,再从新分配的run中分配region
ptr = arena_bin_malloc_hard(arena, bin);
if (ptr == NULL) {
/*
* OOM. tbin->avail isn't yet filled down to its first
* element, so the successful allocations (if any) must
* be moved to the base of tbin->avail before bailing
* out.
*/
if (i > 0) {
memmove(tbin->avail, &tbin->avail[nfill - i],
i * sizeof(void *));
}
break;
}
if (config_fill && unlikely(opt_junk_alloc)) {
arena_alloc_junk_small(ptr, &arena_bin_info[binind],
true);
}
/* Insert such that low regions get used first. */
tbin->avail[nfill - 1 - i] = ptr;
}
if (config_stats) {
bin->stats.nmalloc += i;
bin->stats.nrequests += tbin->tstats.nrequests;
bin->stats.curregs += i;
bin->stats.nfills++;
tbin->tstats.nrequests = 0;
}
malloc_mutex_unlock(&bin->lock);
tbin->ncached = i;
}

这里注意到 arena_tcache_fill_small 中填充了下面这么多个region到tbin->avail中

1
tcache_bin_info[binind].ncached_max >> tbin->lg_fill_div

从run中分配region的函数如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
JEMALLOC_INLINE_C void *
arena_run_reg_alloc(arena_run_t *run, arena_bin_info_t *bin_info)
{
void *ret;
unsigned regind;
arena_chunk_map_misc_t *miscelm;
void *rpages;

assert(run->nfree > 0);
assert(!bitmap_full(run->bitmap, &bin_info->bitmap_info));

regind = bitmap_sfu(run->bitmap, &bin_info->bitmap_info);
miscelm = arena_run_to_miscelm(run);
rpages = arena_miscelm_to_rpages(miscelm);
ret = (void *)((uintptr_t)rpages + (uintptr_t)bin_info->reg0_offset +
(uintptr_t)(bin_info->reg_interval * regind));
run->nfree--;
return (ret);
}

large size 分配

如果有tcache且大小没超过 tcache 的最大范围,就会调用 tcache_alloc_large 从中直接分配,里面是直接调用 tcache_alloc_easy,这里就不赘述。
如果没有tcache或者大小超过了或者tcache中没货了,会调用 arena_run_alloc_large 直接从run中去分配,分配large run的流程和small也差不多

huge size 分配

先创建一个 extent_node_t 来存放后续分配chunk的信息,然后调用 arena_chunk_alloc_huge 来分配,里面会先调用 chunk_alloc_cache 来试着从回收的chunk中来分配,如果不行再调用 arena_chunk_alloc_huge_hard 来分配,这里注意,之前说的chunk的大小是2M,是针对small和large的,huge的chunk size是根据申请的内存大小来的。
huge chunk 被分配出来后会存储到 arena 的 huge 链表方便使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
void *
huge_palloc(tsd_t *tsd, arena_t *arena, size_t size, size_t alignment,
bool zero, tcache_t *tcache)
{
void *ret;
size_t usize;
extent_node_t *node;
bool is_zeroed;

/* Allocate one or more contiguous chunks for this request. */

usize = sa2u(size, alignment);
if (unlikely(usize == 0))
return (NULL);
assert(usize >= chunksize);

/* Allocate an extent node with which to track the chunk. */
node = ipallocztm(tsd, CACHELINE_CEILING(sizeof(extent_node_t)),
CACHELINE, false, tcache, true, arena);
if (node == NULL)
return (NULL);

/*
* Copy zero into is_zeroed and pass the copy to chunk_alloc(), so that
* it is possible to make correct junk/zero fill decisions below.
*/
is_zeroed = zero;
arena = arena_choose(tsd, arena);
if (unlikely(arena == NULL) || (ret = arena_chunk_alloc_huge(arena,
size, alignment, &is_zeroed)) == NULL) {
idalloctm(tsd, node, tcache, true);
return (NULL);
}

extent_node_init(node, arena, ret, size, is_zeroed, true);

if (huge_node_set(ret, node)) {
arena_chunk_dalloc_huge(arena, ret, size);
idalloctm(tsd, node, tcache, true);
return (NULL);
}

/* Insert node into huge. */
malloc_mutex_lock(&arena->huge_mtx);
ql_elm_new(node, ql_link);
ql_tail_insert(&arena->huge, node, ql_link);
malloc_mutex_unlock(&arena->huge_mtx);

if (zero || (config_fill && unlikely(opt_zero))) {
if (!is_zeroed)
memset(ret, 0, size);
} else if (config_fill && unlikely(opt_junk_alloc))
memset(ret, 0xa5, size);

return (ret);
}
void *
arena_chunk_alloc_huge(arena_t *arena, size_t usize, size_t alignment,
bool *zero)
{
void *ret;
chunk_hooks_t chunk_hooks = CHUNK_HOOKS_INITIALIZER;
size_t csize = CHUNK_CEILING(usize);

malloc_mutex_lock(&arena->lock);

/* Optimistically update stats. */
if (config_stats) {
arena_huge_malloc_stats_update(arena, usize);
arena->stats.mapped += usize;
}
arena->nactive += (usize >> LG_PAGE);

ret = chunk_alloc_cache(arena, &chunk_hooks, NULL, csize, alignment,
zero, true);
malloc_mutex_unlock(&arena->lock);
if (ret == NULL) {
ret = arena_chunk_alloc_huge_hard(arena, &chunk_hooks, usize,
alignment, zero, csize);
}

if (config_stats && ret != NULL)
stats_cactive_add(usize);
return (ret);
}

内存回收

内存回收的核心代码,和分配的代码没太大差异,先通过指针找到chunk,因为chunk的分配都是基于固定offset的,然后通过根据page大小定位到当前指针所在page,并通过arena_mapbits_get找到它的mapbits,mapbits里有它的binind,binind里面有很多信息,包括size class。
以small size为例,首先去把内存还回tbin->avail这个数组,如果这个数组里元素太多超过tbin_info->ncached_max,就会调用 tcache_bin_flush_small 来释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
JEMALLOC_ALWAYS_INLINE void
arena_dalloc(tsd_t *tsd, void *ptr, tcache_t *tcache)
{
arena_chunk_t *chunk;
size_t pageind, mapbits;

assert(ptr != NULL);

chunk = (arena_chunk_t *)CHUNK_ADDR2BASE(ptr);
if (likely(chunk != ptr)) {
pageind = ((uintptr_t)ptr - (uintptr_t)chunk) >> LG_PAGE;
mapbits = arena_mapbits_get(chunk, pageind);
assert(arena_mapbits_allocated_get(chunk, pageind) != 0);
if (likely((mapbits & CHUNK_MAP_LARGE) == 0)) {
/* Small allocation. */
if (likely(tcache != NULL)) {
szind_t binind = arena_ptr_small_binind_get(ptr,
mapbits);
tcache_dalloc_small(tsd, tcache, ptr, binind);
} else {
arena_dalloc_small(extent_node_arena_get(
&chunk->node), chunk, ptr, pageind);
}
} else {
size_t size = arena_mapbits_large_size_get(chunk,
pageind);

assert(config_cache_oblivious || ((uintptr_t)ptr &
PAGE_MASK) == 0);

if (likely(tcache != NULL) && size - large_pad <=
tcache_maxclass) {
tcache_dalloc_large(tsd, tcache, ptr, size -
large_pad);
} else {
arena_dalloc_large(extent_node_arena_get(
&chunk->node), chunk, ptr);
}
}
} else
huge_dalloc(tsd, ptr, tcache);
}

JEMALLOC_ALWAYS_INLINE void
tcache_dalloc_small(tsd_t *tsd, tcache_t *tcache, void *ptr, szind_t binind)
{
tcache_bin_t *tbin;
tcache_bin_info_t *tbin_info;

assert(tcache_salloc(ptr) <= SMALL_MAXCLASS);

// 给region内的数据都memset成0x5a,为啥要这样
if (config_fill && unlikely(opt_junk_free))
arena_dalloc_junk_small(ptr, &arena_bin_info[binind]);

tbin = &tcache->tbins[binind];
tbin_info = &tcache_bin_info[binind];
if (unlikely(tbin->ncached == tbin_info->ncached_max)) {
tcache_bin_flush_small(tsd, tcache, tbin, binind,
(tbin_info->ncached_max >> 1));
}
assert(tbin->ncached < tbin_info->ncached_max);
tbin->avail[tbin->ncached] = ptr;
tbin->ncached++;

tcache_event(tsd, tcache);
}

small size达到一定数量的时候通过下面的函数来释放内存,调用 tcache_bin_flush_small 函数来执行,其中 rem 为 tbin_info->ncached_max 的一半,里面调用 arena_dalloc_bin_junked_locked 函数来做实际的内存回收操作。这里面调用 arena_run_reg_dalloc 来先把region还给run,并把nfree加1,然后程序判断这个run是不是都free了,如果都free了就表示可以回收这个run了,会调用 arena_dissociate_bin_run 解除run和chunk的关系,然后调用 arena_dalloc_bin_run 进行回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

static void
arena_dalloc_bin_locked_impl(arena_t *arena, arena_chunk_t *chunk, void *ptr,
arena_chunk_map_bits_t *bitselm, bool junked)
{
size_t pageind, rpages_ind;
arena_run_t *run;
arena_bin_t *bin;
arena_bin_info_t *bin_info;
szind_t binind;

pageind = ((uintptr_t)ptr - (uintptr_t)chunk) >> LG_PAGE;
rpages_ind = pageind - arena_mapbits_small_runind_get(chunk, pageind);
run = &arena_miscelm_get(chunk, rpages_ind)->run;
binind = run->binind;
bin = &arena->bins[binind];
bin_info = &arena_bin_info[binind];

if (!junked && config_fill && unlikely(opt_junk_free))
arena_dalloc_junk_small(ptr, bin_info);

arena_run_reg_dalloc(run, ptr);
if (run->nfree == bin_info->nregs) {
arena_dissociate_bin_run(chunk, run, bin);
arena_dalloc_bin_run(arena, chunk, run, bin);
} else if (run->nfree == 1 && run != bin->runcur)
arena_bin_lower_run(arena, chunk, run, bin);

if (config_stats) {
bin->stats.ndalloc++;
bin->stats.curregs--;
}
}

最终是调到了 arena_run_dalloc,先把run插入到 arena 的 runs_avail,如果发现 整个chunk都是空的了,会调用 arena_chunk_dalloc 回收chunk,最终是调用 chunk_dalloc_cache,最终都会打到 arena_maybe_purge ,最终都会调用到 chunk_dalloc_mmap ,调用munmap进行回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
static void
arena_run_dalloc(arena_t *arena, arena_run_t *run, bool dirty, bool cleaned,
bool decommitted)
{
arena_chunk_t *chunk;
arena_chunk_map_misc_t *miscelm;
size_t size, run_ind, run_pages, flag_dirty, flag_decommitted;

chunk = (arena_chunk_t *)CHUNK_ADDR2BASE(run);
miscelm = arena_run_to_miscelm(run);
run_ind = arena_miscelm_to_pageind(miscelm);
assert(run_ind >= map_bias);
assert(run_ind < chunk_npages);
size = arena_run_size_get(arena, chunk, run, run_ind);
run_pages = (size >> LG_PAGE);
arena_cactive_update(arena, 0, run_pages);
arena->nactive -= run_pages;

/*
* The run is dirty if the caller claims to have dirtied it, as well as
* if it was already dirty before being allocated and the caller
* doesn't claim to have cleaned it.
*/
assert(arena_mapbits_dirty_get(chunk, run_ind) ==
arena_mapbits_dirty_get(chunk, run_ind+run_pages-1));
if (!cleaned && !decommitted && arena_mapbits_dirty_get(chunk, run_ind)
!= 0)
dirty = true;
flag_dirty = dirty ? CHUNK_MAP_DIRTY : 0;
flag_decommitted = decommitted ? CHUNK_MAP_DECOMMITTED : 0;

/* Mark pages as unallocated in the chunk map. */
if (dirty || decommitted) {
size_t flags = flag_dirty | flag_decommitted;
arena_mapbits_unallocated_set(chunk, run_ind, size, flags);
arena_mapbits_unallocated_set(chunk, run_ind+run_pages-1, size,
flags);
} else {
arena_mapbits_unallocated_set(chunk, run_ind, size,
arena_mapbits_unzeroed_get(chunk, run_ind));
arena_mapbits_unallocated_set(chunk, run_ind+run_pages-1, size,
arena_mapbits_unzeroed_get(chunk, run_ind+run_pages-1));
}

arena_run_coalesce(arena, chunk, &size, &run_ind, &run_pages,
flag_dirty, flag_decommitted);

/* Insert into runs_avail, now that coalescing is complete. */
assert(arena_mapbits_unallocated_size_get(chunk, run_ind) ==
arena_mapbits_unallocated_size_get(chunk, run_ind+run_pages-1));
assert(arena_mapbits_dirty_get(chunk, run_ind) ==
arena_mapbits_dirty_get(chunk, run_ind+run_pages-1));
assert(arena_mapbits_decommitted_get(chunk, run_ind) ==
arena_mapbits_decommitted_get(chunk, run_ind+run_pages-1));
arena_avail_insert(arena, chunk, run_ind, run_pages);

if (dirty)
arena_run_dirty_insert(arena, chunk, run_ind, run_pages);

/* Deallocate chunk if it is now completely unused. */
if (size == arena_maxrun) {
assert(run_ind == map_bias);
assert(run_pages == (arena_maxrun >> LG_PAGE));
arena_chunk_dalloc(arena, chunk);
}

/*
* It is okay to do dirty page processing here even if the chunk was
* deallocated above, since in that case it is the spare. Waiting
* until after possible chunk deallocation to do dirty processing
* allows for an old spare to be fully deallocated, thus decreasing the
* chances of spuriously crossing the dirty page purging threshold.
*/
if (dirty)
arena_maybe_purge(arena);
}
const chunk_hooks_t chunk_hooks_default = {
chunk_alloc_default,
chunk_dalloc_default,
chunk_commit_default,
chunk_decommit_default,
chunk_purge_default,
chunk_split_default,
chunk_merge_default
};

pages_unmap

总结

  • 一些tsd和arena的get set 方法是宏定的
  • tsd的获取代码是宏定义的
  • 代码存在namespace,代码没法直接跳过去,只能按照函数名全文搜索,搜索的时候带上参数类型就能快速搜索到了
  • 一些函数编译后会加上je_前缀

Reference

  1. A Scalable Concurrent malloc(3) Implementation for FreeBSD - jemalloc
  2. jemalloc 源码分析
  3. jemalloc

准备Debug环境

直接Debug kafka.tools.ConsoleProducer 这个类,加上下面的参数

1
--broker-list 127.0.0.1:9092 --topic dummy

注意要先把Broker跑起来

消息发送大体过程

发送消息最终会调用到 KafkaProducer.send 方法,其中比较重要的是选择出paritition,然后将消息 append 到 accumulator,然后唤醒Sender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;

// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}

Sender是个Runnable,外界和 Sender 是通过 accumulator 连接起来,其内部一方面会调用 Sender.sendProducerData 方法去发送消息,
另一方面会去调用 NetworkClient.poll 去做NIO的读写操作,处理代码如下,这里注意到在poll之前会调用 metadataUpdater.maybeUpdate(now),实际上是为了确保和Broker的连接是连通的,如果没有连通会去建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public List<ClientResponse> poll(long timeout, long now) {
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}

long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}

// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

return responses;
}

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
ApiKeys.forId(req.header.apiKey()), req.header.correlationId(), responseStruct);
}
AbstractResponse body = createResponse(responseStruct, req.header);
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
}
}

如果是 ApiKeys.METADATA 和 ApiKeys.API_VERSIONS 这两种请求的返回,会直接处理返回的结果,其中METADATA主要用来更新某个Topic信息,例如它有多少Partition,LeaderPartition的地址是多少,ISR都有哪些Broker等。而API_VERSIONS则是返回不同的API命令支持的最小和最大版本号。对于其他的响应,一般就是 ApiKeys.PRODUCE 的响应,会通过下面的 RequestCompletionHandler callback函数去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;

Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}

for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();

// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0);
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}

String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};

String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

其实还有一个小点需要注意,Kafka的Produce过程是发送到Topic、Partition的Leader的,因为我们有METADATA请求的响应,所以我们有这些信息,同时我们会在发送之前检查Leader的连通性并进行连接,也就是在Sender.java:sendProducerData方法中,调用client.ready,这个里面会判断并调用 initiateConnect 去建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Sender.java
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}

sendProduceRequests(batches, now);

return pollTimeout;
}

// RecordAccumulator.java,将Leader放入 readyNodes
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<ProducerBatch> deque = entry.getValue();

Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

总结

总的来说,消息发送大致是这样一个流程,假设第一次没有请求到Topic、Partition的Leader Broker
kafka producer

Reference

  1. A Guide To The Kafka Protocol

背景介绍

Apache Kafka内部使用了TimingWheel来替换 java.util.concurrent.DelayQueue and java.util.Timer ,主要目的是为了将 O(log n)的插入和删除时间复杂度降低到 O(1)

任务的插入

总的来说,TimingWheel只有几十行代码,主要代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

private[this] val interval = tickMs * wheelSize
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs

// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
@volatile private[this] var overflowWheel: TimingWheel = null

private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}

def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs

if (timerTaskEntry.cancelled) {
// Cancelled
false
} else if (expiration < currentTime + tickMs) {
// Already expired
false
} else if (expiration < currentTime + interval) {
// Put in its own bucket
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)

// Set the bucket expiration time
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket)
}
true
} else {
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}

// Try to advance the clock
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)

// Try to advance the clock of the overflow wheel if present
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}

添加任务对应到了上面的add方法,逻辑非常明白,如果已经取消了或者过期了就不处理,如果在第一层的表盘里,就直接加到对应bucket的链表中,如果超过了第一层的表盘,就递归的调用overflowWheel添加进去,这个类似数字计算进位一样,overflowWheel中会根据超过的倍数不断添加很多层。

任务的执行

执行的过程是通过 SystemTimer 来执行的,它的 advanceClock 方法会被一个线程循环调用,然后从 delayQueue 拉出一个bucket,这个 queue 和 TimngWheel 里的queue是同一个,拉出来之后会调用bucket.flush来把一个表盘中的某一格的任务链表都遍历一遍,先从链表删除,再调用 addTimerTaskEntry 方法加进去,如果加失败了,而且不是取消,就说明是过期了,就把任务放到 taskExecutor 线程池中去执行,这个线程池只有一个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def advanceClock(timeoutMs: Long): Boolean = {
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration())
bucket.flush(reinsert)
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}

private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// Already expired or cancelled
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}

上面的执行入口是在 DelayedOperationPurgatory 的 expirationReaper 字段,它创建了一个线程专门去执行 advanceClock 方法,里面会调用SystemTimer.advanceClock 方法,也就是上面说到的那个方法,整个执行过程是单独的一个线程在执行,不影响添加任务的线程,但是这个线程里的处理逻辑比较复杂,所以这个O(1)换O(log n)也不是能白白换的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
false) {

override def doWork() {
advanceClock(200L)
}
}

def advanceClock(timeoutMs: Long) {
timeoutTimer.advanceClock(timeoutMs)

// Trigger a purge if the number of completed but still being watched operations is larger than
// the purge threshold. That number is computed by the difference btw the estimated total number of
// operations and the number of pending delayed operations.
if (estimatedTotalOperations.get - delayed > purgeInterval) {
// now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
// clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
// a little overestimated total number of operations.
estimatedTotalOperations.getAndSet(delayed)
debug("Begin purging watch lists")
val purged = allWatchers.map(_.purgeCompleted()).sum
debug("Purged %d elements from watch lists.".format(purged))
}
}

总结

  • 网上有很多介绍的,讲了如何将任务分到不同的表盘,但是没有讲怎么执行

Reference

  1. Kafka技术内幕样章 层级时间轮

一、安装依赖

gradle类似maven,是用来组织依赖的,scala可以支持编译后成为java的class文件,但是在书写上比Java更加简洁,支持更多语言特性

1
2
3
4
5
6
7
8
9
10
11
12
13
## gradle
https://gradle.org/releases/
wget https://downloads.gradle-dn.com/distributions/gradle-3.5-all.zip
unzip gradle-3.5-all.zip

## scala
wget https://downloads.lightbend.com/scala/2.13.7/scala-2.13.7.tgz
tar zxvf scala-2.13.7.tgz

## zookeeper
https://zookeeper.apache.org/releases.html
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz
tar zxvf apache-zookeeper-3.5.9-bin.tar.gz

如果需要使用方便,可以将上述两者加入到PATH(例如在~/.bash_profile中设置),另外zookeeper启动的时候需要将conf/zoo_sample.cfg移动到conf/zoo.cfg,然后再通过./bin/zkServer.sh start来启动

二、下载kafka源码

进入下载页面选择合适的版本

1
2
3
4
5
https://kafka.apache.org/downloads
wget https://archive.apache.org/dist/kafka/0.11.0.2/kafka-0.11.0.2-src.tgz
tar zxvf kafka-0.11.0.2-src.tgz
cd kafka-0.11.0.2-src
gradle idea

gradle跑的时候会很慢,需要修改build.gradle里的buildscript里的repositories,添加阿里的源

1
maven{url 'http://maven.aliyun.com/nexus/content/groups/public/'}

然后通过IDEA打开工程即可

Reference

  1. gradle换源

Redis HA Architecture

在我看来,Redis的HA(High Availability)通常是通过 Replication + Redis Sentinel 来实现,我司的Redis架构大致如下

redis ha architecture

其中关键点在于,通过Twemproxy来做代理,主要是在Redis Master故障时,通过监控程序获取新的Master信息,并将请求Failover到新的Master,避免上层感知到故障,同时减少Redis Server的TCP连接数;Redis Server采用主从复制结构,一个Master,两个Slave,同时配备了N个Sentinel,N为单数,为了方便画图这里设置了三个,每个Sentinel都连接到所有的Master和Slave节点,通过定时PING来检测节点是否故障,并在故障时执行投票选出新的Leader/Master。注意,实际生产环境Twemproxy可能有很多个。

How Redis Sentinel Failover(故障切换)?

整个高可用的关键点在于,Slave能够不断的复制Master上的数据修改,同时Redis Sentinel能够在故障时切换Master,前者比较好实现也比较好理解,后者还是有点意思,这里参考一些资料和源码详细说一下。

Step 1 发现故障

Sentinel会定时调用 sentinel.c:sentinelHandleRedisInstance 函数,里面在 sentinel.c:sentinelSendPeriodicCommands 函数中,调用了 sentinelSendPing 函数向被监控的Redis节点发送 PING 命令,命令被正常收到的时候回调函数会将 link->act_ping_time 设置为0,标记这个PING被成功收到,注意到这个过程是异步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int sentinelSendPing(sentinelRedisInstance *ri) {
int retval = redisAsyncCommand(ri->link->cc,
sentinelPingReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"PING"));
if (retval == C_OK) {
ri->link->pending_commands++;
ri->link->last_ping_time = mstime();
/* We update the active ping time only if we received the pong for
* the previous ping, otherwise we are technically waiting since the
* first ping that did not receive a reply. */
if (ri->link->act_ping_time == 0)
ri->link->act_ping_time = ri->link->last_ping_time;
return 1;
} else {
return 0;
}
}

之后会在 sentinelCheckSubjectivelyDown 中将节点状态,检查通过 elapsed > ri->down_after_period 来判断 MASTER节点是否 主观下线,设置为 SRI_S_DOWN,其中down_after_period是启动后设置的一个固定值,elapsed = mstime() - ri->link->act_ping_time,这个 act_ping_time 会在上面的代码中PING命令的Callback中被修改,如果PING成功会被修改成last_ping_time也就是当前时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;

// ... 省略一些代码

/* Update the SDOWN flag. We believe the instance is SDOWN if:
*
* 1) It is not replying.
* 2) We believe it is a master, it reports to be a slave for enough time
* to meet the down_after_period, plus enough time to get two times
* INFO report from the instance. */
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
/* Is subjectively down */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* Is subjectively up */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}

主观下线(SRI_S_DOWN)的意思很好理解,就是当前Sentinel节点认为Redis节点已经挂了,但是主观下线,也可能是自己的网络问题,导致自己和Redis节点连接不上,因此需要客观下线(SRI_O_DOWN)才能真正确认节点故障了。

Step 2 确认故障

在定时任务函数中会调用 sentinelCheckObjectivelyDown,整个过程比较简单,就是通过所有的sentinel对于某个节点的判断来投票,如果有quorum个Sentinel节点认为Redis节点已经主观下线了,就会将该Redis节点设置为客观下线(SRI_O_DOWN),同时发布出消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;

if (master->flags & SRI_S_DOWN) {
/* Is down for enough sentinels? */
quorum = 1; /* the current sentinel. */
/* Count all the other sentinels. */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
if (quorum >= master->quorum) odown = 1;
}

/* Set the flag accordingly to the outcome. */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}

这里其实漏了一个过程,就是当前Sentinel怎么收集其他Sentinel对于某个Redis节点的状态判断,这个是通过 sentinelAskMasterStateToOtherSentinels 中发送 is-master-down-by-addr 命令来发起收集,其他Sentinel在收到命令后会在 sentinelCommand 函数中处理请求,将状态回复出来(判断是否主观下线),被回复的消息在回调函数 sentinelReceiveIsMasterDownReply 中被处理,更新节点的状态,将节点的flag设置为SRI_MASTER_DOWN。

Step 3 选举出Sentinel Leader

在定时任务中会调用 sentinelStartFailoverIfNeeded 函数判断是否要发起Failover,主要的依据就是节点已经客观下线了并且没有正在执行中的Failover,并且距离上次failover的时间足够久,然后就会调用 下面的函数开始failover。

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);

master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}

上面的函数会修改一些状态,其中将 failover_state 改成了 SENTINEL_FAILOVER_STATE_WAIT_START,在定时的任务中会有一个状态机来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}

因此会进入上面的 sentinelFailoverWaitStart 函数,然后调用 sentinelGetLeader 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
dictEntry *de;
unsigned int voters = 0, voters_quorum;
char *myvote;
char *winner = NULL;
uint64_t leader_epoch;
uint64_t max_votes = 0;

serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);

voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/

/* Count other sentinels votes */
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
dictReleaseIterator(di);

/* Check what's the winner. For the winner to win, it needs two conditions:
* 1) Absolute majority between voters (50% + 1).
* 2) And anyway at least master->quorum votes. */
di = dictGetIterator(counters);
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);

if (votes > max_votes) {
max_votes = votes;
winner = dictGetKey(de);
}
}
dictReleaseIterator(di);

/* Count this Sentinel vote:
* if this Sentinel did not voted yet, either vote for the most
* common voted sentinel, or for itself if no vote exists at all. */
if (winner)
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else
myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);

if (myvote && leader_epoch == epoch) {
uint64_t votes = sentinelLeaderIncr(counters,myvote);

if (votes > max_votes) {
max_votes = votes;
winner = myvote;
}
}

voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;

winner = winner ? sdsnew(winner) : NULL;
sdsfree(myvote);
dictRelease(counters);
return winner;
}

整个过程大致就是统计每个Sentinel选的leader节点,选出得票最多的作为leader,因为在之前 is-master-down-by-addr 命令的时候会收集一波其他Sentinel的信息,其中就包括它们所投票的leader和epoch信息,这些可以被用在上面的Leader选举过程中。
选完Leader之后,failover_state 会被修改成 SENTINEL_FAILOVER_STATE_SELECT_SLAVE。

Step 4 挑选集群新Master候选人

在状体机中,SENTINEL_FAILOVER_STATE_SELECT_SLAVE会相应的去执行 sentinelFailoverSelectSlave 函数,其中会调用 sentinelSelectSlave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;

if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;

if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
if (slave->slave_priority == 0) continue;

/* If the master is in SDOWN state we get INFO for slaves every second.
* Otherwise we get it with the usual period so we need to account for
* a larger delay. */
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}

会根据一些硬性条件去选择,例如不能是主观下线或者客观下线等等,选出来一堆之后用快速排序去排,根据 compareSlavesForPromotion 这个比较函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
**sb = (sentinelRedisInstance **)b;
char *sa_runid, *sb_runid;

if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;

/* If priority is the same, select the slave with greater replication
* offset (processed more data from the master). */
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
return 1; /* a > b */
}

/* If the replication offset is the same select the slave with that has
* the lexicographically smaller runid. Note that we try to handle runid
* == NULL as there are old Redis versions that don't publish runid in
* INFO. A NULL runid is considered bigger than any other runid. */
sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0;
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid);
}

大致就是根据这三个条件

  • lower slave_priority.
  • bigger processed replication offset.(代表包含的数据可能更多,和Master延迟更小)
  • lexicographically smaller runid.(这是人为配置的ID)

选出来之后会将新的Master候选人放到 promoted_slave 保存起来,并将状态修改成 SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE

Step 5 替换候选人为新Master

后续都是根据状态机走,SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 对应 sentinelFailoverSendSlaveOfNoOne 函数,里面做的主要事情就是调用 sentinelSendSlaveOf 函数,发送 SLAVEOF 命令给其他节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
char portstr[32];
int retval;

// ...
/* In order to send SLAVEOF in a safe way, we send a transaction performing
* the following tasks:
* 1) Reconfigure the instance according to the specified host/port params.
* 2) Rewrite the configuration.
* 3) Disconnect all clients (but this one sending the commnad) in order
* to trigger the ask-master-on-reconnection protocol for connected
* clients.
*
* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

/* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
* however sending it to an instance not understanding this command is not
* an issue because CLIENT is variadic command, so Redis will not
* recognized as a syntax error, and the transaction will not fail (but
* only the unsupported command will fail). */
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
sentinelInstanceMapCommand(ri,"CLIENT"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}

可以看到命令被 MULTI 和 EXEC 包围了,也就是Redis事务,这样可以确保命令都被执行,最主要的命令是向被上一步被选中的promoted_slave节点发送 SLAVE OF 命令,上述执行完了之后会在状态机函数中将状态设置为 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION,这个状态对应的状态机里只是调用 sentinelFailoverWaitPromotion 函数进行检查

1
2
3
4
5
6
7
8
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* Just handle the timeout. Switching to the next state is handled
* by the function parsing the INFO command of the promoted slave. */
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}

后续在INFO命令的处理过程中,调用 sentinelRefreshInstanceInfo 函数,如果回复的节点是从节点,但是它的角色是主节点了,说明改主节点命令SLAVEOF命令已经生效了,会修改一些状态,并将状态机改到 SENTINEL_FAILOVER_STATE_RECONF_SLAVES,在对应的处理函数 sentinelFailoverReconfNextSlave 中,会遍历所有的节点并发送 sentinelSendSlaveOf 命令,这个过程会做一定的限流,防止修改太快影响Client调用,函数最后会调用 sentinelFailoverDetectEnd 进行检查,如果所有节点状态都修改完成了,会进入下一个状态 SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,通过 sentinelFailoverSwitchToPromotedSlave 函数触发这个状态的处理,里面调用 sentinelResetMasterAndChangeAddress 函数,主要是更新本机上主从节点的信息。

总结

关键点:故障的是Redis节点,但是选举是Sentinel对Redis节点的选举,同一届选举中,一个Sentinel只会投票一次(以后不管谁问都是只投票给这个人),如果发生平票,会随机等待一段时间再发起一次投票
整个过程中的一些关键步骤,都会通过Pub/Sub发布出去。

sentinel.conf 里面只配置了需要监控的master节点的ip端口,后续会在INFO命令的回复中收到slave节点的信息,并创建slave节点节点的SentinelRedisInstance结构体。同时会在HELLO的channel去从 Master 或者 Slave 节点接收其他Sentinel的信息,这样就能感知到其他的Sentinel了。所有节点创建好不会立刻进行连接,而是在实际需要通信之前对link进行判断并进行连接。

Reference

  1. Rafe Understandable Distributed Consensus
  2. Redis源码解析:21sentinel(二)定期发送消息、检测主观下线
  3. Redis源码解析:22sentinel(三)客观下线以及故障转移之选举领导节点
  4. Redis源码解析:23sentinel(四)故障转移流程
  5. Redis Sentinel Documentation
  6. Redis Sentinel — High Availability: Everything you need to know from DEV to PROD: Complete Guide

简介

redis数据备份有rdb和aof两种

AOF

每次执行命令,会调用 server.c:call 函数,里面会调用 propagate 函数,内部主要做aof和replication的事情,aof部分会判断是否需要记录aof(即是否打开appendonly以及命令是否需要记录),并通过 aof.c:feedAppendOnlyFile 将命令对应的 aof 的数据放到 server.aof_buf 中(内存),然后在 aof.c:flushAppendOnlyFile 中将 server.aof_buf 中的数据通过 aof.c:aofWrite 函数写入到aof文件中,这里使用的是 write 系统调用,整个过程在一次ae loop中执行,执行完成之后会 aof.c:aof_background_fsync 将aof文件加入到 bio_jobs 数组中,会有一个后台进程通过调用 bio.c:bioProcessBackgroundJobs 来处理aof文件,通过调用 fsync 将文件改动刷到磁盘中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];

/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != server.aof_selected_db) {
char seldb[64];

snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}

if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT */
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
int i;
robj *exarg = NULL, *pxarg = NULL;
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
* for the replication itself. */
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}

/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
// 这是关键代码,将数据append到aof_buf
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

sdsfree(buf);
}

void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;

if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}

if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();

if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */

latencyStartMonitor(latency);
// 将 server.aof_buf 通过 write 系统调用写入到文件中
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);

/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;

if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;

/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}

/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}

if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}

/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;

/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;

/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}

try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;

/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
// 加入 fsync 任务数组
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}

void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;

/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}

/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

while(1) {
listNode *ln;

/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);

/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
// 执行fsync操作
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
/* What we free changes depending on what arguments are set:
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the skiplist. */
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);

/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;

/* Unblock threads blocked on bioWaitStepOfType() if any. */
pthread_cond_broadcast(&bio_step_cond[type]);
}
}

除此之外,由于aof文件一般较大,redis会对aof文件做压缩,代码里称为rewrite,这个后续细说
aof是默认关闭的,我们可以在配置文件中打开

1
2
3
4
5
#
appendonly yes

# The name of the append only file (default: "appendonly.aof")
appendfilename "appendonly.aof"

或者在运行时通过

1
config set appendonly yes

来打开

RDB

server.c里面 serverCron 函数里面会调用 rdbSaveBackground 函数,这个会fork出一个线程将db快照备份好

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();

start = ustime();
if ((childpid = fork()) == 0) {
int retval;

/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}

server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}

int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;

snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}

rioInitWithFile(&rdb,fp);

if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}

/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;

werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}

总结

aof和rdb可以同时存在
write系统调用会将数据写入到文件对应的内存,但是并不能保证会立刻同步到磁盘,而fsync的功能就是将内存中的文件差异同步到磁盘,确保断电后不丢失。

Reference

  1. aof rewrite分析
  2. fsync

简介

redis可以用来做存储和缓存,当它被拿来做缓存时,当存储空间达到预设的最大值(server.maxmemory),需要通过一些算法例如LRU或者LFU来淘汰数据

key淘汰类型

总的淘汰类型分了很多种,总结来说分了两种,一种带VOLATILE的是只针对设置了过期时间的key来筛选淘汰,另一种带了ALLKEYS的是对所有的key来筛选淘汰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define MAXMEMORY_FLAG_LRU (1<<0)
#define MAXMEMORY_FLAG_LFU (1<<1)
#define MAXMEMORY_FLAG_ALLKEYS (1<<2)
#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS \
(MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)

#define MAXMEMORY_VOLATILE_LRU ((0<<8)|MAXMEMORY_FLAG_LRU)
#define MAXMEMORY_VOLATILE_LFU ((1<<8)|MAXMEMORY_FLAG_LFU)
#define MAXMEMORY_VOLATILE_TTL (2<<8)
#define MAXMEMORY_VOLATILE_RANDOM (3<<8)
#define MAXMEMORY_ALLKEYS_LRU ((4<<8)|MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_LFU ((5<<8)|MAXMEMORY_FLAG_LFU|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_ALLKEYS_RANDOM ((6<<8)|MAXMEMORY_FLAG_ALLKEYS)
#define MAXMEMORY_NO_EVICTION (7<<8)

redis的db中分别维护了这两份数据,dict是所有的key-value,expires是设置了超时的key->value

1
2
3
4
5
6
7
8
9
10
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

而其中LRU指的就是我们常说的 Least Recent Usage,LFU指的是 Least Frequently Used,而TTL指的是使用过期时间来决定过期,这里我们单独介绍下LFU,因为redis的LFU并不是传统的只根据访问频率来过期的

LFU

首先我们看下保存淘汰信息的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 对应的是 dictEntry 中的 val
/*
* 16 bits 8 bits
* +----------------+--------+
* + Last decr time | LOG_C |
* +----------------+--------+
*/
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;

typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;

其中 LRU_BITS 为 24,其中高16位是server启动后的分钟数对0xFFFF取余的结果,大致可以表示为时间,注意到这个时间会在45天半左右进行循环,剩余的8位存储了一个count计数
接下来我们看下这个值的修改和查询逻辑,每次key被访问的时候,会调用下面的方法更新lru字段,其中高16位直接从当前的server.unixtime获取了,低8位的counter较为复杂,首先是,然后进行一个对数的增加(带一定概率的增加,访问频率越高,增加的概率越低)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand()/RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}

其中server.lfu_decay_time默认为1,也就是说每过一分钟,LFUDecrAndReturn这个函数就会将低8位的counter减1,同时LFULogIncr函数会根据server.lfu_log_factor(默认是10)对counter做加一操作(存在一定概率,概率指数减少),最终这个counter作为新的counter存储起来。
个人认为redis的LFU过期不太严谨,对于时间会在45天半循环,即45天半之后插入的key和之前的key其过期时的值是一样的。有点不太公平,理论上应该是新的key优先级更高一点,更不容易被淘汰。而LFU的特别之处在于它在高16位带上了访问时间。

当需要淘汰key时,evict.c:freeMemoryIfNeeded 函数会被调用,最终会调用到 evictionPoolPopulate 函数,它会对当前的哈希表进行随机采样,函数会随机选择出N个key,并按照淘汰规则计算过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry *samples[server.maxmemory_samples];

count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
robj *o;
dictEntry *de;

de = samples[j];
key = dictGetKey(de);

/* If the dictionary we are sampling from is not the main
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object. */
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}

/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
* However when the policy is an LFU one, we have a frequency
* estimation, and we want to evict keys with lower frequency
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255. */
idle = 255-LFUDecrAndReturn(o);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}

/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time. */
k = 0;
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
/* Inserting in the middle. Now k points to the first element
* greater than the element to insert. */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* Free space on the right? Insert at k shifting
* all the elements from k to end to the right. */

/* Save SDS before overwriting. */
sds cached = pool[EVPOOL_SIZE-1].cached;
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
pool[k].cached = cached;
} else {
/* No free space on right? Insert at k-1 */
k--;
/* Shift all elements on the left of k (included) to the
* left, so we discard the element with smaller idle time. */
sds cached = pool[0].cached; /* Save SDS before overwriting. */
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}

/* Try to reuse the cached SDS string allocated in the pool entry,
* because allocating and deallocating this object is costly
* (according to the profiler, not my fantasy. Remember:
* premature optimizbla bla bla bla. */
int klen = sdslen(key);
if (klen > EVPOOL_CACHED_SDS_SIZE) {
pool[k].key = sdsdup(key);
} else {
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
pool[k].key = pool[k].cached;
}
pool[k].idle = idle;
pool[k].dbid = dbid;
}
}

采样出server.maxmemory_samples个key来处理,每次选出一个来过期,例如LFU时候是按照下面的方式计算

1
2
3
4
5
6
7
8
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}

用低8位的count减去key的存活时间分钟数,这个值越小越需要过期。程序会循环直到当前内存不再超过最大内存的时候才停止。
这样也是有一些合理性的,因为一般内存超过的时候,需要淘汰的key不会特别多(前提是新增的key不会那么多)。

总结

另外一个需要注意的是,redis在选择过期的key的时候都是随机选择一定数量(server.maxmemory_samples)的key然后进行筛选的,
目前在生产环境,都会设置内存报警,并不设置maxmemory,禁止key进行淘汰,以达到加快用户访问速度的目标,如果缓存数据量特别大可以考虑增加过期策略。

Reference

  1. redis
  2. redis as a lru cache

简介

最近在做一些工作,涉及gRPC,阅读代码并通过一些测试发现gRPC使用HTTP/2.0做底层数据传输,发现HTTP/2.0有一些不错的特性

经过

gRPC通过 ManagedChannel 来建立来创建出Stub,通过Stub执行对gRPC Server的调用。一般情况下我们需要并发的执行调用,通常的一些做法都是维持一个连接池,循环利用连接,避免经常性的创建新的连接,但是我看了公司包装 ManagedChannel 的代码并没找到有连接池之类的池化技术,从网上搜索也发现说这个东西一个 Host、Port 创建一个就行了。
然后我就觉得一定是 ManagedChannel 内部做了这个事情,通过测试代码一直没找到,测试代码是用一个线程池同时发送n个请求,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ClientMain {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.userAgent("dan-gcpc")
.build();

HelloServiceGrpc.HelloServiceBlockingStub stub = HelloServiceGrpc.newBlockingStub(channel);

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(() -> System.out.println(stub.hello(HelloRequest.newBuilder()
.setFirstName("Wood")
.setLastName("Dan 1")
.build())));
executorService.submit(() -> System.out.println(stub.hello(HelloRequest.newBuilder()
.setFirstName("Wood")
.setLastName("Dan 2")
.build())));
executorService.submit(() -> System.out.println(stub.hello(HelloRequest.newBuilder()
.setFirstName("Wood")
.setLastName("Dan 3")
.build())));
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.HOURS);

channel.shutdown();
}
}

通过打印出的日志结果,我们可以看出所有的请求都是用的同一个socket

1
2
3
4
5
6
7
13:26:10.830 [grpc-nio-worker-ELG-1-2] DEBUG io.grpc.netty.NettyClientHandler - 
[id: 0x10c09f8f, L:/127.0.0.1:51915 - R:localhost/127.0.0.1:8080] INBOUND DATA: streamId=7 padding=0 endStream=false length=24 bytes=00000000130a1148656c6c6f2c20576f6f642044616e2033
greeting: "Hello, Wood Dan 2"

13:26:10.831 [grpc-nio-worker-ELG-1-2] DEBUG io.grpc.netty.NettyClientHandler -
[id: 0x10c09f8f, L:/127.0.0.1:51915 - R:localhost/127.0.0.1:8080] INBOUND HEADERS: streamId=11 headers=GrpcHttp2ResponseHeaders[grpc-status: 0] streamDependency=0 weight=16 exclusive=false padding=0 endStream=true
greeting: "Hello, Wood Dan 3"

觉得很奇怪,上公司线上的机器去 netstat 了一波,发现针对一个 Host、Port 也都只有一个socket,这些和连接池是相悖的,因为连接池通常会会对一个 host port建立n个连接,也就是netstat的时候你会发现很多目标ip端口一样但是本地端口不同的socket。

转折

实在是查不出原因了,我干脆tcpdump了一下请求,看请求到底有啥不一样,结果撇到一个关键字HTTP/2.0

1
2
3
4
5
6
7
8
01:10:50.959964 IP (tos 0x0, ttl 64, id 0, offset 0, flags [DF], proto TCP (6), length 122, bad cksum 0 (->3c7c)!)
localhost.65376 > localhost.http-alt: Flags [P.], cksum 0xfe6e (incorrect -> 0xa146), seq 1:71, ack 41, win 6379, options [nop,nop,TS val 1105197772 ecr 1105197733], length 70: HTTP
E..z..@.@............`....2.P.L......n.....
A...A...PRI * HTTP/2.0

SM

............................... ..............

瞬间觉得找到了,上网一搜发现HTTP/2.0是支持multiplex,也就是多路复用,就是用一个socket来支持多个逻辑连接,这样就不需要连接池了,因为可以在一个socket上同时并发n个请求,其原理是将请求在应用层包装成Frame,多个Frame组成一个Stream,Stream对应一个逻辑的连接,Freme里保存一个StreamId来区分不同的逻辑连接。

HTTP/2.0特性

除了上面所说的多路复用功能,HTTP/2.0还提供了很多其他的特性(全部是来自第二个参考文档)

  1. Header压缩;每次发送变换的Header,不变的不重复发送了。
  2. 请求优先级控制,包括权重和依赖关系设置;
  3. Server Push(Server主动发数据给Client,例如做一些消息推送)
  4. 流量控制(Flow Control)
    这些东西第二个参考文档都讲的非常通俗易懂,可以直接去看

总结

虽然HTTP/2.0提供了这些新特性,但是个人感觉并不是说都是好的,还是存在一点弊端

  1. 拿多路复用来说,一个socket承载了n个Stream,如果socket偶然被拥塞控制了,数据传输速度变慢,所有的请求都会受到影响,而且相关内核的buffer需要更大,因为请求变多了,而且请求有优先级。
  2. 发现这个问题的过程比较有意思,所以记录下来了,HTTP/2.0本身是下一代的HTTP协议,用chrome抓包看,看network那个tab,右键把protocol选上,就能看到h2的协议类型其实就是HTTP/2.0,其他如http/1.1之类的。Google基本都用HTTP/2.0了,国内网站例如百度、快手还是混杂了HTTP/2.0和HTTP/1.1。
  3. 学到新的一招,telnet host 80 命令之后,可以直接手动发送HTTP请求(只针对HTTP/1.0)

Reference

  1. Introducing to gRpC
  2. HTTP/2, Chapter 12 HTTP, High Performance Browser Networking

简介

本文主要介绍zookeeper的FastLeaderElection过程。

环境准备

首先我们下载一个

1
2
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar zxvf zookeeper-3.4.12.tar.gz

为了支持集群模式,我们在需要增加集群的配置文件,主要包括 confx/zoo.cfg 和 data/nodex/myid,其中zoo.cfg格式如下

1
2
3
4
5
6
7
8
tickTime=2000
dataDir=/Users/danwu/software/zookeeper-3.4.12/data/node1
clientPort=2181
initLimit=5
syncLimit=2
server.1=127.0.0.1:2881:3891
server.2=127.0.0.1:2882:3892
server.3=127.0.0.1:2883:3893

这里我们设置了三个节点,节点的ip都是本机(127.0.0.1),ip后的两个端口分别是Leader与Follower通信的端口和Leader选举的专用端口,dataDir目录下存放myid文件,表示节点的id,对应 data/nodex/myid
通过下面的命令在命令行启动两个节点

1
java -Dzookeeper.log.dir=/Users/danwu/software/zookeeper-3.4.12/log2 -Dzookeeper.root.logger=INFO,CONSOLE -cp zookeeper-3.4.12.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../lib/slf4j-log4j12-1.7.25.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../lib/slf4j-api-1.7.25.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../lib/netty-3.10.6.Final.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../lib/log4j-1.2.17.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../lib/jline-0.9.94.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../lib/audience-annotations-0.5.0.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../zookeeper-3.4.12.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../src/java/lib/*.jar:/Users/danwu/software/zookeeper-3.4.12/bin/../conf: org.apache.zookeeper.server.quorum.QuorumPeerMain /Users/danwu/software/zookeeper-3.4.12/conf1/zoo.cfg

然后再在IDEA中启动另外一个节点方便DEBUG

Leader选举

Zookeeper将一致性问题做了拆分,首先通过Leader选举确定一个Leader和多个Follower角色,在后续的操作过程,写操作都直接或者间接转发的Leader进行处理,Leader会在处理过程会通过 Proposal-Ack-Commit 来确保数据一致性。
本文简单介绍Leader选举,下一节介绍数据读写操作流程

  • QuorumPeerMain在启动之后会调用runFromConfig方法后会开始运行,其中会调用 QuorumPeer会作为一个线程跑,内部判断节点状态为Looking(节点启动后默认环境),会通过makeLEStrategy().lookForLeader()执行选举
  • 然后进入到 FastLeaderElection.lookForLeader(),不断发送notification类型的通知给其他节点,直至选举出Leader并确认出身份,发送给其他节点的数据主要包括一个leader选举的票,出事时为自己,如果收到别的人投票比自己高,会将这个更高的票更新到自己的选票,票的高低大小是通过 FastLeaderElection.totalOrderPredicate()判断,简单来说就是比较transaction id、poll epoche、zookeeper id的情况
  • 选举结束后,各自进入各自角色的工作流

总结

debug的时候,版本一定要和下载的版本一致,否则debug会很奇怪

Reference

  1. Zookeeper 3.4 Documentation

简介

本文主要介绍zookeeper的读写操作执行流程。总所周知,Zookeeper通过ZAB(Zookeeper Atomic Broadcast)来进行主从同步,达到最终一致性的效果。
zookeeper overview
Client连接到不同的节点,当执行读请求时,请求由Client当前连接的节点直接处理,执行写请求时,请求会被Client当前连接的节点路由到Leader节点处理,Leader节点通过 Proposal - Ack - Commit 流程将写操作同步到其他的Follower和Learner节点,最终返回给Client连接的节点。

Leader处理写请求流程

Leader选举执行完之后,会调用Leader.lead(),方法内部会创建LearnerCnxAcceptor用于接收Follower的连接请求,并启动ZkServer准备接收Client的请求(startZkServer()),
之后,会通过两个Socket来接收Follower和Client的请求

  • 客户端建立连接请求,ZookeeperServer.processConnectRequest()
  • 客户端提交Request请求,ZookeeperServer.processPacket()
  • Follower的写请求转发,LearnerHandler leader.zk.submitRequest(si)

其中客户端连接通过 NIOServerCnxn/NettyServerCnxn 进入,Follower连接通过 LearnerHandler 进入。这些请求最终都会被 firstProcessor(LeaderRequestProcessor) 处理,然后调用 PrepRequestProcessor 做准备,接着是 ProposalRequestProcessor 发送 proposal,
并将请求记录到 CommitProcessor。Follower的请求会通过 LearnerHandler 被Leader收到,然后
调用 Leader.processAck() 接收ACK后,当通过 QuorumVerifier.containsQuorum() 判断达到多数条件时,会发送 commit 请求给所有的 Follower ,并调用 commitProcessor.commit() 方法,最终请求会被放到nextProcessor,也就是 Leader.ToBeAppliedRequestProcessor,这个的作用就是将请求从 toBeProcess 链表里删除,在这之前会调用 FinalRequestProcessor 将请求最终写到 ZkDb,执行实际的修改。
上述请求处理链详见LeaderZookeeperServer,整个RequestProcessor链在 CommitProcessor 有一些“断”,因为它流转到下一个 Processor 是依赖 commit 方法的调用,而这个是依赖 Follower 去触发的。

Follower处理写请求流程

同样,Follower 的写请求包括 Client 发起的和从 Leader 同步过来的,

  • 客户端的请求同样是分别来自 ZookeeperServer.processConnectRequest()的连接请求和 ZookeeperServer.processPacket() 的io请求,请求通过 ServerCnxnFactory 传进来
  • 另外是Follower在进入角色之后会调用 Follower.followLeader()进入死循环,执行Learner.readPacket()和Follower.processPacket(),分别来读取Leader的请求和处理Leader的请求

Client发来的请求的详细处理过程是

  1. 通过 ZookeeperServer.processPacket() 和 ZookeeperServer.submitRequest() 接收请求
  2. 请求被 firstProcessor 处理,也就是 FollowerZooKeeperServer 中定义的 FollowerRequestProcessor,这个的功能包括了将写请求转发到Leader,然后将请求转移到下一个 CommitProcessor,这个会将请求放到缓存数组,等待proposal-ack-commit流程(下面详细介绍)结束后,最后转移到 FinalRequestProcessor 将数据存储起来

Leader发来的请求详细处理过程大致就是Proposal-Ack-Commit的过程

  1. 通过 Follower.processPacket() 接收Leader的请求,里面分别定义了对 PROPOSAL、COMMIT等请求类型的处理,其中 PROPOSAL的处理过程就是调用FollowerZookeeperServer.logRequest()打一个日志,COMMIT的处理过程就是调用 CommitProcessor.commit()做提交的校验,如果commit达到多数就会继续FinalRequestProcessor了
    FinalRequestProcessor 负责最后的处理,执行实际的操作,例如增删改查

总结

  • RequestProcessor 是典型的责任链模式,通过它将各个处理流程都解耦了。
  • Zookeeper内部创建了大量线程,处理都是异步的,通常都是将请求加入到处理队列,然后线程会在run方法中poll这个队列并做处理

Reference

  1. Zookeeper 3.4 Documentation