秘而不宣系列
Go中秘而不宣的数据结构 runq, 难怪运行时调度那么好
Go中秘而不宣的数据结构 spmc, 10倍性能于 channel
treap
是一棵二叉树,它同时维护二叉搜索树 (BST) 和堆的属性, 所以由此得名 (tree + heap ⇒ treap)。
从形式上讲,treap (tree + heap) 是一棵二叉树,其节点包含两个值,一个
key
和一个
priority
,这样
key
保持 BST 属性,
priority
是一个保持 heap 属性的随机值(至于是最大堆还是最小堆并不重要)。相对于其他的平衡二叉搜索树,treap 的特点是实现简单,且能基本实现随机平衡的结构。属于弱平衡树。
treap
由 Raimund Siedel 和 Cecilia Aragon 于 1989 年提出。
treap 通常也被称为“笛卡尔树”,因为它很容易嵌入到笛卡尔平面中:
具体来说,
treap
是一种在二叉树中存储键值对
(X,Y)
的数据结构,其特点是:按
X
值满足二叉搜索树的性质,同时按
Y
值满足二叉堆的性质。如果树中某个节点包含值
(X₀,Y₀)
,那么:
-
左子树中所有节点的 X 值都满足
X ≤ X₀
(BST 属性)
-
右子树中所有节点的 X 值都满足
X₀ ≤ X
(BST 属性)
-
左右子树中所有节点的 Y 值都满足 Y ≤ Y₀ (堆属性。这里以最大堆为例)
在这种实现中, X 是键(同时也是存储在 Treap 中的值),并且 Y 称为
优先级
。如果没有优先级,则 treap 将是一个常规的二叉搜索树。
优先级(前提是每个节点的优先级都不相同)的特殊之处在于:它们可以确定性地决定树的最终结构(不会受到插入数据顺序的影响)。这一点是可以通过相关定理来证明的。这里有个巧妙的设计:如果我们随机分配这些优先级值,就能在平均情况下得到一棵比较平衡的树(避免树退化成链表)。这样就能保证主要操作(如查找、插入、删除等)的时间复杂度保持在 O(log N) 水平。正是因为这种随机分配优先级的特点,这种数据结构也被称为"随机二叉搜索树"。

Treap 维护堆性质的方法用到了旋转,且只需要进行两种旋转操作,因此编程复杂度较红黑树、AVL 树要小一些。
红黑树的操作:
插入
以最大堆为例
给节点随机分配一个优先级,先和二叉搜索树的插入一样,先把要插入的点插入到一个叶子上,然后跟维护堆一样进行以下操作:
-
如果当前节点的优先级比父节点大就进行 2. 或 3. 的操作
-
-
删除
因为 treap 满足堆性质,所以只需要把要删除的节点旋转到叶节点上,然后直接删除就可以了。具体的方法就是每次找到优先级最大的子叶,向与其相反的方向旋转,直到那个节点被旋转到了叶节点,然后直接删除。
查找
和一般的二叉搜索树一样,但是由于 treap 的随机化结构,Treap 中查找的期望复杂度是
O(logn)
以上是 treap 数据结构的背景知识,如果你想了解更多关于 treap 的知识,你可以参考
-
https://en.wikipedia.org/wiki/Treap
-
https://medium.com/carpanese/a-visual-introduction-to-treap-data-structure-part-1-6196d6cc12ee
-
https://cp-algorithms.com/data_structures/treap.html
Go 运行时的 treap 和用途
在 Go 运行时
sema.go#semaRoot
[1]
中,定义了一个数据结构
semaRoot
:
type semaRoot struct {
lock mutex
treap *sudog // 不重复的等待者(goroutine)的平衡树(treap)的根节点
nwait atomic.Uint32 // 等待者(goroutine)的数量
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
waiters uint16
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
这是 Go 语言互斥锁(Mutex)底层实现中的关键数据结构,用于管理等待获取互斥锁的 goroutine 队列。我们已经知道,在获取
sync.Mutex
时,如果锁已经被其它 goroutine 获取,那么当前请求锁的 goroutine 会被 block 住,就会被放入到这样一个数据结构中 (所以你也知道这个数据结构中的 goroutine 都是唯一的,不重复)。
semaRoot
保存了一个平衡树,树中的
sudog
节点都有不同的地址
(s.elem)
,每个
sudog
可能通过
s.waitlink
指向一个链表,该链表包含等待相同地址的其他
sudog
。对具有相同地址的
sudog
内部链表的操作时间复杂度都是 O(1).。扫描顶层 semaRoot 列表的时间复杂度是
O(log n)
,其中
n
是具有被阻塞 goroutine 的不同地址的数量(这些地址会散列到给定的 semaRoot)。
semaRoot
的
treap *sudog
其实就是一个 treap, 我们来看看它的实现。
增加一个元素(入队)
增加一个等待的 goroutine(
sudog
)到
semaRoot
的
treap
中,如果
lifo
为
true
,则将
s
替换到
t
的位置,否则将
s
添加到
t
的等待列表的末尾。
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
// 设置这个要加入的节点
s.g = getg()
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = nil
s.waiters = 0
var last *sudog
pt := &root.treap
// 从根节点开始
for t := *pt; t != nil; t = *pt { // ①
// 如果地址已经在列表中,则加入到这个地址的链表中
if t.elem == unsafe.Pointer(addr) {
// 如果地址已经在列表中,并且指定了先入后出flag,这是一个替换操作
if lifo {
// 替换操作
*pt = s
s.ticket = t.ticket
... // 把t的各种信息复制给s
} else {
// 增加到到等待列表的末尾
if t.waittail == nil {
t.waitlink = s
} else {
t.waittail.waitlink = s
}
t.waittail = s
s.waitlink = nil
if t.waiters+1 != 0 {
t.waiters++
}
}
return
}
last = t
// 二叉搜索树查找
if uintptr(unsafe.Pointer(addr)) uintptr(t.elem) { // ②
pt = &t.prev
} else {
pt = &t.next
}
}
// 为新节点设置ticket.这个ticket是一个随机值,作为随机堆的优先级,用于保持treap的平衡。
s.ticket = cheaprand() | 1 // ③
s.parent = last
*pt = s
// 根据优先级(ticket)旋转以保持treap的平衡
for s.parent != nil && s.parent.ticket > s.ticket { // ④
if s.parent.prev == s {
root.rotateRight(s.parent) // ⑤
} else {
if s.parent.next != s {
panic("semaRoot queue")
}
root.rotateLeft(s.parent) // ⑥
}
}
}
① 是遍历 treap 的过程,当然它是通过搜索二叉树的方式实现。
addr
就是我们一开始讲的 treap 的 key,也就是
s.elem
。首先检查
addr
已经在 treap 中,如果存在,那么就把
s
加入到
addr
对应的
sudog
链表中,或者替换掉
addr
对应的
sudog
。
这个
addr
, 如果对于
sync.Mutex
来说,就是
Mutex.sema
字段的地址。
type Mutex struct {
state int32
sema uint32
}
所以对于阻塞在同一个
sync.Mutex
上的 goroutine,他们的
addr
是相同的,所以他们会被加入到同一个
sudog
链表中。如果是不同的
sync.Mutex
锁,他们的
addr
是不同的,那么他们会被加入到这个 treap 不同的节点。
进而,你可以知道,这个
rootSema
是维护多个
sync.Mutex
的等待队列的,可以快速找到不同的
sync.Mutex
的等待队列,也可以维护同一个
sync.Mutex
的等待队列。这给了我们启发,如果你有类似的需求,可以参考这个实现。
③ 就是设置这个节点的优先级,它是一个随机值,用于保持 treap 的平衡。这里有个技巧就是总是把优先级最低位设置为 1,这样保证优先级不为 0.因为优先级经常和 0 做比较,我们将最低位设置为 1,就表明优先级已经设置。
④ 就是将这个新加入的节点旋转到合适的位置,以保持 treap 的平衡。这里的旋转操作就是上面提到的左旋和右旋。稍后看。
移除一个元素(出队)
对应的,还有出对的操作。这个操作就是从 treap 中移除一个节点,这个节点就是一个等待的 goroutine(
sudog
)。
dequeue
搜索并找到在
semaRoot
中第一个因
addr
而阻塞的
goroutine
。比如需要唤醒一个 goroutine, 让它继续执行(比如直接将锁交给它,或者唤醒它去争抢锁)。
func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
ps := &root.treap
s := *ps
for ; s != nil; s = *ps { // ①, 二叉搜索树查找
if s.elem == unsafe.Pointer(addr) { // ②
goto Found
}
if uintptr(unsafe.Pointer(addr)) uintptr(s.elem) {
ps = &s.prev
} else {
ps = &s.next
}
}
return nil, 0, 0
Found: // ③
...
if t := s.waitlink; t != nil { // ④
*ps = t
...
} else { // ⑤
// 旋转s到叶节点,以便删除
for s.next != nil || s.prev != nil {
if s.next == nil || s.prev != nil && s.prev.ticket root.rotateRight(s)
} else {
root.rotateLeft(s)
}
}
// ⑤ 删除s
if s.parent != nil {
if s.parent.prev == s {
s.parent.prev = nil
} else {
s.parent.next = nil
}
} else {
root.treap = nil
}
tailtime = s.acquiretime
}
... // 清理s的不需要的信息
return s, now, tailtime
}
① 是遍历 treap 的过程,当然它是通过搜索二叉树的方式实现。
addr
就是我们一开始讲的 treap 的 key,也就是
s.elem
。如果找到了,就跳到
Found
标签。如果没有找到,就返回
nil
。
④ 是检查这个地址上是不是有多个等待的 goroutine,如果有,就把这个节点替换成链表中的下一个节点。把这个节点从 treap 中移除并返回。如果就一个 goroutine,那么把这个移除掉后,需要旋转 treap,直到这个节点被旋转到叶节点,然后删除这个节点。
这里的旋转操作就是上面提到的左旋和右旋。
左旋 rotateLeft
rotateLeft
函数将以
x
为根的子树左旋,使其变为
y
为根的子树。左旋之前的结构为
(x a (y b c))
,旋转后变为
(y (x a b) c)
。
func (root *semaRoot) rotateLeft(x *sudog) {
// p -> (x a (y b c))
p := x.parent
y := x.next
b := y.prev
y.prev = x // ①
x.parent = y // ②
x.next = b // ③
if b != nil {
b.parent = x // ④
}
y.parent = p // ⑤
if p == nil {
root.treap = y // ⑥
} else if p.prev == x { // ⑦
p.prev = y
} else {
if p.next != x {
throw("semaRoot rotateLeft")
}
p.next = y
}
}
具体步骤:
-
将
y
设为
x
的父节点(②),
x
设为
y
的左子节点(①)。
-
将
b
设为
x
的右子节点(③),并更新其父节点为
x
(④)。
-
更新
y
的父节点为
p
(⑤),即
x
的原父节点。如果
p
为 nil,则 y 成为新的树根(⑥)。
-
根据
y
是
p
的左子节点还是右子节点,更新对应的指针(⑦)。
左旋为
右旋 rotateRight