一、rax概述
rax全称为 Radix Tree,也叫基数树,属于前缀树的一种。前缀树 的特点是适合保存具有相同前缀的数据,从而实现节省内存空间的目标,以及支持范围查询。前缀树的结构如下图所示,这是一个基础表示。
(f) ""
\
(o) "f"
\
(o) "fo"
\
[t b] "foo"
/ \
"foot" (e) (a) "foob"
/ \
"foote" (r) (r) "fooba"
/ \
"footer" [] [] "foobar"
如上,这个图表示了 foo、 footer 和 foobar 这三个字符串, 当节点表示基数树中的key时,我们将其写入位于方括号 [] 之间,否则位于括号 () 之间。
这里有个常见优化点,对于连续只有单个子节点的node可以压缩成一个 「压缩节点」,因此,上面的树形图变成了下面这种
["foo"] ""
|
[t b] "foo"
/ \
"foot" ("er") ("ar") "foob"
/ \
"footer" [] [] "foobar"
不过,这种树形图实现上却是有点麻烦,比如 当字符串 “first” 要插入上面的 Radix Tree 中,这里就涉及到节点切割了,因为此时 “foo” 就不再是一个公共前缀了,最终树形图如下:
(f) ""
/
(i o) "f"
/ \
"firs" ("rst") (o) "fo"
/ \
"first" [] [t b] "foo"
/ \
"foot" ("er") ("ar") "foob"
/ \
"footer" [] [] "foobar"
看这三个图的变化,你会发现,每次要查找"foo"字符串,被标记为key的节点都不太一样,这是源文件redis.h中的注释画的图形。我们可以先保留这个疑问,接着看实现代码,尝试从代码中找答案。
二、rax结构
rax是一种树形结构,定义如下:
typedef struct rax {
raxNode *head; // rax的根节点,通常不保存任何字符串数据,也不代表任何 key
uint64_t numele; // 当前树中存储的 key 的数量
uint64_t numnodes; // 整个 rax 中的节点总数(raxNode 个数)
size_t *alloc_size; // 记录整个 rax 分配的内存总大小
void *metadata[]; // 一个可变长度数组,用来存储额外信息
} rax;
可以看到rax的节点是 raxNode 结构。定义如下:
typedef struct raxNode {
uint32_t iskey:1; // 节点是否包含key
uint32_t isnull:1; // 节点的值是否为NULL
uint32_t iscompr:1; // 是否是压缩节点
uint32_t size:29; // 节点大小
unsigned char data[]; // 节点的实际存储数据
} raxNode;
该结构中的成员变量包括 4 个元数据,这四个元数据的含义分别如下。
- iskey:表示从 Radix Tree 的根节点到当前节点路径上的字符组成的字符串,是否表示了一个完整的 key。如果是的话,那么 iskey 的值为 1。否则,iskey 的值为 0。不过,这里需要注意的是,当前节点所表示的 key,并不包含该节点自身的内容。
- isnull:表示当前节点是否为空节点。如果当前节点是空节点,那么该节点就不需要为指向 value 的指针分配内存空间了。
- iscompr:表示当前节点是非压缩节点,还是压缩节点。
- size:表示当前节点的大小,具体值会根据节点是压缩节点还是非压缩节点而不同。如果当前节点是压缩节点,该值表示压缩数据的长度;如果是非压缩节点,该值表示该节点指向的子节点个数
除了元数据,该结构体中还有 char 类型数组 data。需要知道的是,data 是用来保存实际数据的。不过,这里保存的数据会根据当前节点的类型而有所不同:
- 对于非压缩节点来说,data 数组包括子节点对应的字符、指向子节点的指针,以及节点表示 key 时对应的 value 指针;
- 对于压缩节点来说,data 数组包括子节点对应的合并字符串、指向子节点的指针,以及节点为 key 时的 value 指针。
到这里,你可能已经发现,在 raxNode 的实现中,无论是非压缩节点还是压缩节点,其实具有两个特点:
- 它们所代表的 key,是从根节点到当前节点路径上的字符串,但并不包含当前节点;
- 它们本身就已经包含了子节点代表的字符或合并字符串。而对于它们的子节点来说,也都属于非压缩或压缩节点,所以,子节点本身又会保存,子节点的子节点所代表的字符或合并字符串。
单从文字上的描述来理解,你可能依然觉得非常抽象,没关系,继续向下读,到插入元素时就可以理解了。
三、rax辅助结构raxStack
raxStack 是 rax内部用的一个轻量级栈,主要用于:在遍历 / 查找 / 插入过程中,记录路径(节点栈)。定义一个栈的原因如下:插入时可能需要 回溯(split / 合并)、删除时可能需要 向上清理、遍历时需要 返回父节点。
#define RAX_STACK_STATIC_ITEMS 32
typedef struct raxStack {
void **stack; // 指向当前使用的“栈数组”
size_t items, maxitems; // 当前栈中有多少节点 & 当前栈的总容量
void *static_items[RAX_STACK_STATIC_ITEMS]; // 一个内嵌的小数组,避免小规模使用时频繁 malloc
int oom; // 是否发生过内存分配失败(Out Of Memory)
} raxStack;
接下来我们看下栈的初始化以及入栈和出栈的实现:
3.1、栈的初始化
static inline void raxStackInit(raxStack *ts) {
ts->stack = ts->static_items; // 将当前使用的栈数据指向内部的数组
ts->items = 0; // 设置当前栈中的节点数为0
ts->maxitems = RAX_STACK_STATIC_ITEMS; // 设置栈的总容量大小为32
ts->oom = 0; // 记录栈申请内存失败的次数为0
}
从初始化可以看出,Redis为了提升性能,在栈中内置了一个小数组,当栈中元素小于32时是不需要向操作系统申请内存空间的。
3.2、入栈
入栈的函数实现是 raxStackPush,如果成功就返回1,如果oom就返回0,代码如下:
static inline int raxStackPush(raxStack *ts, void *ptr) {
if (ts->items == ts->maxitems) { // 如果栈的节点数量已经达到最大,即和容量一样大时
if (ts->stack == ts->static_items) { // 如果目前使用的是内置数组,即当前栈中节点总数量为32
ts->stack = rax_malloc(sizeof(void*)*ts->maxitems*2); // 申请比当前容量扩大一倍的内存大小
if (ts->stack == NULL) { // 申请失败返回0
ts->stack = ts->static_items;
ts->oom = 1;
errno = ENOMEM;
return 0;
}
memcpy(ts->stack,ts->static_items,sizeof(void*)*ts->maxitems); // 把内置数组中的数据拷贝到新申请的内存空间(不再使用内置数组)
} else { // 这个分支代表当前栈中节点总数量已经超过32
void **newalloc = rax_realloc(ts->stack,sizeof(void*)*ts->maxitems*2);
if (newalloc == NULL) {
ts->oom = 1;
errno = ENOMEM;
return 0;
}
ts->stack = newalloc;
}
ts->maxitems *= 2;
}
ts->stack[ts->items] = ptr; // 给栈中数组的下一个指针设置为 入栈的节点 ptr
ts->items++; // 总节点数量 + 1
return 1;
}
3.2、出栈
出栈的操作相对简单,实现函数为 raxStackPop。
static inline void *raxStackPop(raxStack *ts) {
if (ts->items == 0) return NULL; // 如果栈内已经没数据,直接返回NULL
ts->items--; // 栈内节点总数减1
return ts->stack[ts->items]; // 返回当前栈顶的元素,即数组中节点总数-1 的索引位置
}
通过这个代码我们可以看出,raxStack使用数组保存节点指针,出栈操作只是把栈中节点总数减去1,通过数组索引来定位栈顶。这种实现方法并没有删除元素,下次入栈时直接覆盖即可。
3.3、返回栈顶元素
返回栈顶元素和出栈实现比较类似,补贴的是,它只返回元素,但是不会实际的操作栈顶。如下所示:
static inline void *raxStackPeek(raxStack *ts) {
if (ts->items == 0) return NULL;
return ts->stack[ts->items-1];
}
3.4、释放栈
当内置数组不够是,我们需要申请堆内存来存放栈的数据,那在释放时这些内存也是需要释放的,代码如下:
static inline void raxStackFree(raxStack *ts) {
if (ts->stack != ts->static_items) rax_free(ts->stack);
}
四、rax的基本操作
我们还是先从rax的创建开始来看下是如何实现的。
4.1、rax的创建
创建一个新的rax通过 raxNew 实现,它底层调用了 raxNewWithMetadata,代码如下:
rax *raxNewWithMetadata(int metaSize, size_t *alloc_size) {
size_t usable;
rax *rax = rax_malloc_usable(sizeof(*rax) + metaSize, &usable); // 申请内存,并记录实际可以内存大小(操作系统一般会按照字节对齐比实际需求申请的大)
if (rax == NULL) return NULL;
rax->numele = 0; // 设置key数量为0
rax->numnodes = 1; // 设置节点数量为1,即trax结构本身包含的根节点
rax->alloc_size = alloc_size; // 如果传入了内存大小就直接给属性赋值
if (rax->alloc_size) *rax->alloc_size += usable; // 如果没有传入即不需要携带额外数据,就把usable赋给rax->alloc_size
rax->head = raxNewNode(rax, 0, 0); // 创建根节点
if (rax->head == NULL) { // 如果根节点申请失败,就释放内存
if (rax->alloc_size) *rax->alloc_size -= usable;
rax_free(rax);
return NULL;
} else {
return rax;
}
}
rax *raxNew(void) {
return raxNewWithMetadata(0, NULL);
}
如上所示,raxNew就是直接调用了 raxNewWithMetadata,传入参数 0 和 NULL, 即没有额外信息。 在创建rax的过程中,首先需要创建新节点,代码如下:
raxNode *raxNewNode(rax *rax, size_t children, int datafield) {
size_t nodesize = sizeof(raxNode)+children+raxPadding(children)+
sizeof(raxNode*)*children;
if (datafield) nodesize += sizeof(void*); // 计算所需要的内存大小
size_t usable;
raxNode *node = rax_malloc_usable(nodesize,&usable); // 申请内存
if (node == NULL) return NULL;
node->iskey = 0; // 新节点初始化
node->isnull = 0;
node->iscompr = 0;
node->size = children; // 设置节点的子节点数量
if (rax->alloc_size) *rax->alloc_size += usable; // 这个节点的申请内存记录到 rax->alloc_size 上,用于统计rax结果使用的总内存
return node;
}
如上所示,创建根节点时传入的参数是 0, 即一个新的rax结构根节点还没有子节点和数据域。
4.2、查询元素
这里我们先介绍查询,是因为在插入元素时也需要先做查询操作,找到新元素应该的插入的位置。rax提供了查找key的接口raxFind,该接口用于获取key对应的value值,定义如下:
int raxFind(rax *rax, unsigned char *s, size_t len, void **value) {
raxNode *h;
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,NULL);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey)
return 0;
if (value != NULL) *value = raxGetData(h); // 通过raxGetData获取到key对应的value
return 1;
}
这个函数实现的功能是 在rax中查找长度为 len 的字符串 s (s为rax中的key),找到之后返回 key 对应的 value。 核心调用了 raxLowWalk 方法,这个 方法返回 s 的匹配长度,当 s ! = len 时,表示未查找到该key;当s == len时,需要检验 stopnode 是否为 key,并且当 stopnode 为压缩节点时,还需要检查splitpos是否为0(可能匹配到某个压缩节点中间的某个元素)。
接下来我们来看 raxLowWalk 是如何定义的:
static inline size_t raxLowWalk(rax *rax, unsigned char *s, size_t len, raxNode **stopnode, raxNode ***plink, int *splitpos, raxStack *ts) {
raxNode *h = rax->head; // 从根节点来时匹配
raxNode **parentlink = &rax->head; // 指向“当前节点指针的地址”
size_t i = 0; // 当前匹配到字符串 s 的第几个字符
size_t j = 0; // 当前在 node 内部的位置:压缩节点 → 在字符串里 普通节点 → 在 children 数组里
while(h->size && i < len) { // 主循环,循环条件是当前节点有内容,目标字符串未匹配完
unsigned char *v = h->data; // 取出当前节点的数据
if (h->iscompr) { // 如果是压缩节点,一个节点代表多个字符
for (j = 0; j < h->size && i < len; j++, i++) { // 一次性匹配一段字符串:
if (v[j] != s[i]) break;
}
if (j != h->size) break; // 判断是否完全匹配, 如果中途不匹配就break
} else { // 普通节点,一个节点有多个分支(children)
for (j = 0; j < h->size; j++) { // 查找匹配的子节点
if (v[j] == s[i]) break;
}
if (j == h->size) break; // 没有对应分支 → 查找结束
i++; // 找到了, i++ 消耗目标字符串一个字符
}
if (ts) raxStackPush(ts,h); // 把当前节点压入栈, 用于插入时回溯或删除时修复
raxNode **children = raxNodeFirstChildPtr(h); // 跳到子节点 , 获取 children 指针数组
if (h->iscompr) j = 0; // 压缩节点:只有一个子节点 固定在 index 0
memcpy(&h,children+j,sizeof(h)); //
parentlink = children+j; // 当前节点在父节点里的位置
j = 0;
}
if (stopnode) *stopnode = h;
if (plink) *plink = parentlink;
if (splitpos && h->iscompr) *splitpos = j;
return i; // 成功匹配了多少字符
}
实现逻辑大体:
- 从根节点开始匹配,直到当前待查找节点无子节点或者 s 查找完毕。对于每个节点来说,如果为压缩节点,则需要与 s 中的字符完全匹配。如果为非压缩节点,则查找与当前待匹配字符相同的字符。
- 如果当前待匹配节点能够与 s 匹配,则移动位置到其子节点,继续匹配,直至匹配完成。
总结,raxLowWalk 沿着 key 在 radix tree 里尽可能往下走,并返回匹配长度 + 停止位置 + 修改入口 + 路径信息”。
4.3、插入元素
向 rax 中插入 key-value 对,对于已存在的 key, rax 提供了2种方案,覆盖或者不覆盖原有的 value,对应的接口分别为 raxInsert、raxTryInsert。真正实现函数 raxGenericInsert,函数原型如下:
int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **old, int overwrite)
插入的过程比较复杂,仅仅该函数的实现就有几百行,我们不在这里进行代码逐行张贴并解析,源码中有大量的注释。我们结合一个case来看一下在一个空的rax中插入一个key foo 是如何实现的。
foo作为rax中的第一个字符串,在插入前rax时空的,我们来看一下具体插入流程:
- 通过raxLowWalk查找 foo 是否存在。返回结果i为0,在rax匹配到了0个字符,记录h是根节点,parentlink指向根节点的指针。
- foo 会最为第一个节点插入,因为没有任何分支,它作为一个压缩节点插入。
- 遍历第一个字符 f,此时头字节的size为0,即没有子节点。申请一个新的压缩节点,并同时给这个压缩节点申请一个唯一的子节点,子节点为空
- 把新节点通过内存拷贝,把新申请的压缩节点拷贝到根节点的位置。即这个新节点成为新的根节点。
- 设置空子节点的iskey属性为1,并将它的value 设置给子节点。
该分支代码如下:
while(i < len) { // 按照字符长度循环
raxNode *child;
if (h->size == 0 && len-i > 1) { // 走这个分支,因为头节点的size=0,此时
size_t comprsize = len-i; // 计算压缩节点的长度
raxNode *newh = raxCompressNode(rax,h,s+i,comprsize,&child); // 申请一个新的压缩节点,并给这个压缩节点创建唯一的子节点,这个新的子节点为空。
h = newh;
memcpy(parentlink,&h,sizeof(h));
parentlink = raxNodeLastChildPtr(h); // 把 parentlink 移动到当前新节点 h 的“子节点指针槽位”,方便下一步继续往下挂节点
i += comprsize;
}
......
rax->numnodes++; // rax的节点数量+1
h = child; // h指向压缩节点的子节点
}
...
raxNode *newh = raxReallocForData(rax,h,data); // 给唯一的空子节点申请空间
h = newh;
if (!h->iskey) rax->numele++;
raxSetData(h,data); // 这里把子节点设置为key,并且把key对应的value值存储到这个空子节点。
memcpy(parentlink,&h,sizeof(h));
return 1; /* Element inserted. */
可以看到代码内容,当插入第一个字符串时,它会作为一个压缩节点插入,并把这个压缩节点作为根节点。此时rax状态如下:
("foo") (compress size=3)
|
[] (iskey, size=0, value)
根节点是压缩节点保存了 foo,它具有唯一的子节点,设置为空节点,iskey,value保存到这个空节点上,这样变量到这个空节点点,就指定前边走过的路径 foo 就是一个key。这里的描述要从 raxCompressNode 的实现才能看出来。
如上说是,仅仅给一个空的rax插入第一个key的实现就如此复杂,当插入第二个元素,第三个元素需要进行合并、分裂操作更加复杂,这里不再去扣这些细节,通过本文我们需要了解rax的结构是什么样的就足够了。
4.4、删除元素
rax 的删除操作主要有3个接口,可以删除 rax 中的某个 key,或者释放整个 rax,在释放 rax 时,还可以设置释放回调函数,在释放rax 的每个 key 时,都会调用这个回调函数,3个接口的定义如下:
// 在 rax 中删除长度为 len 的 s
int raxRemove(rax *rax, unsigned char *s, size_t len, void **old);
// 释放 rax
void raxFree(rax *rax);
// 释放 rax, 释放每个 key 时,都会调用 free_callback 方法
void raxFreeWithCallback(rax *rax, void (*free_callback)(void*));
其中,rax 的释放操作,采用的是深度优先算法。下面重点介绍 raxRemove 方法:
第一步:当删除 rax 中的某个 key-value 对时,首先查找 key 是否存在,不存在则直接返回,存在则需要进行删除操作,实现如下:
raxNode *h;
raxStack ts;
raxStackInit(&ts); // 保存路径
int splitpos = 0;
size_t i = raxLowWalk(rax,s,len,&h,NULL,&splitpos,&ts);
if (i != len || (h->iscompr && splitpos != 0) || !h->iskey) { // 未找到目标 key, 直接退出
raxStackFree(&ts);
return 0;
}
第二步,如果 key 存在,则需要进行删除操作,删除操作完成后,rax 树可能需要进行压缩。具体可以分为下面 2 种情况,此处所说的压缩是指将某个节点与其子节点压缩成一个节点,叶子节点没有子节点,不能进行压缩。
- 某个节点只有一个子节点,该子节点之前是key,经删除操作后不再是key,此时可以将该节点与其子节点压缩。
- 某个节点有两个子节点,经过删除操作后,只剩下一个子节点,如果这个子节点不是key,则可以将该节点与这个子节点压缩
4.5、rax迭代器
rax提供了迭代器用于对rax进行遍历,结构如下:
typedef struct raxIterator {
int flags; // 控制迭代行为的标志位, 控制是否 EOF(结束)、是否处于 seek 状态等等
rax *rt; // 当前遍历的 radix tree
unsigned char *key; // 前遍历到的 key(字符串)
void *data; // 当前 key 对应的 value
size_t key_len; // 当前 key 的长度
size_t key_max; // key buffer 当前最大容量
unsigned char key_static_string[RAX_ITER_STATIC_LEN]; // 一个小的 栈上 buffer
raxNode *node; // 当前所在节点,只在 unsafe iteration(非安全迭代) 时使用
raxStack stack; // 遍历路径栈(核心)
raxNodeCallback node_cb; // 节点回调函数(可选)
void *privdata; // 传给 node_cb 的上下文数据
} raxIterator;
Redis 中实现的迭代器为双向迭代器,可以向前,也可以向后,顺序是按照 key 的字典序排列的。通过 rax 的结构图可以看出,如果某个节点为 key,则其子节点的 key 按照字典序比该节点的 key 大。另外,如果当前节点为非压缩节点,则其最左侧节点的key是其所有子节点的 key 中最小的。主要方法有:
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len);
int raxNext(raxIterator *it);
int raxPrev(raxIterator *it);
- raxStart 用于初始化 raxIterator 结构:
- raxSeek,在raxStart初始化迭代器后,必须调用raxSeek函数初始化迭代器的位置。
- raxNext & raxPrev,主要用于迭代上一个或者下一个key
- raxStop & raxEOF: raxEOF用于标识迭代器迭代结束,raxStop用于结束迭代并释放相关资源:
五、总结
首先,要理解 rax 底层实现,第一步需要理解 rax 的结构;其次,要理解 压缩节点 和 非压缩节点 的区别来,再回顾下 压缩节点 和 非压缩节点 的区别: 它们的相同之处在于:
- 都有保存元数据的节点头 HDR;
- 都会包含指向子节点的指针,
- 以及子节点所代表的字符串。从根节点到当前节点路径上的字符串如果是 Radix Tree 的一个 key,它们都会包含指向 key 对应 value 的指针。
不同之处在于:
- 非压缩节点指向的子节点,每个子节点代表一个字符,非压缩节点可以指向多个子节点;
- 压缩节点指向的子节点,代表的是一个合并字符串,压缩节点只能指向一个子节点。
「真诚赞赏,手留余香」
真诚赞赏,手留余香
使用微信扫描二维码完成支付