(相关资料图)
B+树是一棵多叉排序树,即每个非叶子节点可以包含多个子节点,其整体结构呈扁平化,所以其非常适配于数据库和操作系统的文件系统中。且B+树能够保持数据的稳定有序,插入和删除都拥有较稳定的对数时间复杂度。
B+树的特性:以 m 阶为例,m 表示内部节点即非叶子节点可以包含的最大子节点个数 maximumNum
Math.ceil(m / 2)
个子节点,这是因为B+树的生成顺序导致的(m+1)/2
,另一个为(m+2)/2
Math.ceil(m / 2)
目前实现的B+树的简易版,叶子节点是存储的Entry
键值对,内部节点存储的是Integer
索引,后续有时间再进行泛型的通用扩展。
package bplustree;public abstract class Node { InternalNode parent; // 父节点 public Node() {} public abstract boolean isValid(); // 判断删除节点后各B+树节点是否满足要求 public abstract boolean isAvailable(); // 判断B+树节点是否可以分裂节点给其他节点 public abstract boolean isMergeable(); // 判断B+树节点是否可以和其他节点合并}
public class InternalNode extends Node{ int maxChildNodes; // 子节点个数最大值 m,m为阶数 int minChildNodes; // 除根节点及叶子节点外,子节点个数最小值 ceil(m / 2) int curNodesNum; // 内部节点当前的子节点个数 InternalNode leftSibling; // 左兄弟节点 InternalNode rightSibling; // 右兄弟节点 Integer[] keys; // 内部节点当前的索引值,最多有 m - 1 个 Node[] childPointers; // 内部节点当前的子节点,最多有 m 个}
public class LeafNode extends Node { int maximumNum; // 叶子节点最多元素个数 m - 1 int minimumNum; int curNum; // 叶子节点当前的元素个数 LeafNode leftSibling; // 左兄弟和右兄弟形成双向链表 LeafNode rightSibling; Entry[] entries; // 叶子节点键值对,不仅存储索引值,也存储其他值信息}class Entry implements Comparable { Integer key; String value; public Entry(Integer key, String value) { this.key = key; this.value = value; } @Override public int compareTo(Entry o) { return key.compareTo(o.key); }}
public class BPlusTree { int m; InternalNode root; // B+树根节点 LeafNode head; // 叶子节点的首元素}
B+树的查找过程:根据查找的索引值k,从根节点向叶子节点搜索,对数时间复杂度。
public String search(int key) { if (isEmpty()) { return null; } // 树查找 LeafNode leafNode = (this.root == null) ? this.head : findLeafNode(key); Entry[] entries = leafNode.entries; // 叶子节点内进行二分查找 int index = binarySearch(entries, leafNode.curNum, key, null); if (index == -1) { return null; } else { return entries[index] }}// 从根节点开始查找private LeafNode findLeafNode(Integer key) { return findLeafNode(root, key);}// 找到索引值所在的叶子节点private LeafNode findLeafNode(InternalNode internalNode, Integer key) { Integer[] keys = internalNode.keys; int i; for (i = 0; i < internalNode.curNodesNum - 1; i++) { if (key.compareTo(keys[i]) < 0) { break; } } Object child = internalNode.childPointers[i]; if (child instanceof LeafNode) { return (LeafNode) child; } else { return findLeafNode((InternalNode) child, key); }}
B+树区间查询左值可能在的叶子节点位置,然后通过双向链表向后遍历。
// 闭区间 [left, right]public ArrayList searchRange(int left, int right) { List values = new ArrayList<>(); LeafNode leafNode = findLeafNode(left); // 查找左值可能存在的位置,并从该位置向后遍历 boolean flag = true; while (leafNode != null && flag) { Entry[] entries = leafNode.entries; for (Entry entry : entries) { if (entry == null) { break; } if ( entry.key > right) { flag = false; break; } if (left <= entry.key && right >= entry.key) { values.add(entry.value); } } leafNode = leafNode.rightSibling; } return values;}
B+树的插入操作仅在叶子节点进行:
- 若为空树,则创建一个叶子节点,该叶子节点同时也是根节点,插入操作结束;
- 根据插入的 key 值,找到应该在的叶子节点插入;
- 若插入后叶子节点个数符合要求即小于m,则插入结束
- 若插入后叶子节点个数不符合要求即大于等于m,将该节点分裂成两半,则判断当前叶子节点是否为根节点
- 若当前叶子节点为根节点,则构建一个新的root节点,指向分裂后的两个子节点
- 若当前叶子节点不为根节点,则在父节点处添加一个新的子节点,新子节点则存储原节点一半的值,并循环向上判断中间节点是否满足要求
public void insert(int key, String value) { if (isEmpty()) { this.head = new LeafNode(this.m, new Entry(key, value)); } else { LeafNode leafNode = (this.root == null) ? this.head : findLeafNode(key); // 插入叶子节点失败,即叶子节点中存储已到达上限 if (!leafNode.insert(new Entry(key, value))) { leafNode.entries[leafNode.curNum++] = new Entry(key, value); sortEntries(leafNode.entries); // 叶子节点分裂的位置 int mid = getIndexOfMidPointer(); Entry[] halfEntry = splitEntries(leafNode, mid); // 若叶子节点为根节点,即parent为null if (leafNode.parent == null) { Integer[] parent_keys = new Integer[m]; parent_keys[0] = halfEntry[0].key; // 创建新的 root InternalNode parent = new InternalNode(m, parent_keys); leafNode.parent = parent; parent.appendChildPointer(leafNode); } // 若叶子节点不为根节点 else { int newParentKey = halfEntry[0].key; leafNode.parent.keys[leafNode.parent.curNodesNum - 1] = newParentKey; Arrays.sort(leafNode.parent.keys, 0, leafNode.parent.curNodesNum); } // 分裂后的另一半叶子节点,添加到父节点 LeafNode newLeafNode = new LeafNode(this.m, halfEntry, leafNode.parent); // 分裂后的另一半叶子节点对应的下标 int index = leafNode.parent.getIndexOfPointer(leafNode) + 1; for (int i = index; i < leafNode.parent.childPointers.length - 1; i++) { leafNode.parent.childPointers[i + 1] = leafNode.parent.childPointers[i]; } leafNode.parent.childPointers[index] = newLeafNode; // 关联兄弟节点 newLeafNode.rightSibling = leafNode.rightSibling; if (newLeafNode.rightSibling != null) { newLeafNode.rightSibling.leftSibling = newLeafNode; } leafNode.rightSibling = newLeafNode; newLeafNode.leftSibling = leafNode; if (this.root == null) { this.root = leafNode.parent; } else { // 逐渐上浮,判断插入是否会导致B+树内部节点不符合要求 InternalNode internalNode = leafNode.parent; while (internalNode != null) { if (internalNode.isOverfull()) { splitInternalNode(internalNode); } else { break; } internalNode = internalNode.parent; } } } }}/** * 叶子节点插入,导致的上层内部节点分裂 */private void splitInternalNode(InternalNode internalNode) { InternalNode parent = internalNode.parent; int mid = getIndexOfMidPointer(); Integer newParentKey = internalNode.keys[mid]; // 内部节点的 node 分裂 Node[] halfPointers = splitChildPointers(internalNode, mid); // 内部节点的 key 分裂 Integer[] halfKeys = splitKeys(internalNode.keys, mid); // 分裂后内部节点的子节点个数 internalNode.curNodesNum = linearNullSearch(internalNode.childPointers); InternalNode sibling = new InternalNode(this.m, halfKeys, halfPointers); for (Node pointer : halfPointers) { if (pointer != null) { pointer.parent = sibling; } } sibling.rightSibling = internalNode.rightSibling; internalNode.rightSibling = sibling; sibling.leftSibling = internalNode; if (sibling.rightSibling != null) { sibling.rightSibling.leftSibling = sibling; } // root node if (parent == null) { Integer[] keys = new Integer[this.m]; keys[0] = newParentKey; InternalNode newRoot = new InternalNode(this.m, keys); newRoot.appendChildPointer(internalNode); newRoot.appendChildPointer(sibling); this.root = newRoot; internalNode.parent = newRoot; sibling.parent = newRoot; } else { parent.keys[parent.curNodesNum - 1] = newParentKey; Arrays.sort(parent.keys, 0, parent.curNodesNum); int index = parent.getIndexOfPointer(internalNode) + 1; parent.insertChildPointer(sibling, index); sibling.parent = parent; }}private Node[] splitChildPointers(InternalNode node, int split) { Node[] pointers = node.childPointers; Node[] newPointers = new Node[this.m + 1]; for (int i = split + 1; i < pointers.length; i++) { newPointers[i - split - 1] = pointers[i]; node.removePointer(i); } return newPointers;}private Integer[] splitKeys(Integer[] keys, int split) { Integer[] newKeys = new Integer[m]; keys[split] = null; for (int i = split + 1; i < keys.length; i++) { newKeys[i - split] = keys[i]; keys[i] = null; } return newKeys;}
B+树的删除操作仅在叶子节点进行:
- 若删除后,叶子节点中的索引个数仍然满足要求即大于等于
Math.ceil(m / 2)
时,将该叶子节点的其他索引左移一位,删除结束;- 若删除后,叶子节点中的索引个数不满足最低要求,则查询左右兄弟节点:
- 若左/右兄弟节点中索引个数大于
Math.ceil(m / 2)
,则从左/右兄弟节点中移动一个索引项到当前叶子节点中,并修改父节点的索引值,删除结束- 若左/右兄弟节点中索引个数等于
Math.ceil(m / 2)
,则将左/右节点与当前节点合并,修改父节点的索引记录,并向上逐级判断内部节点是否因为页合并导致索引项不满足最低要求,删除结束
public void delete(int key) { if (isEmpty()) { System.out.println("Invalid: The B+ tree is empty!"); } else { LeafNode leafNode = this.root == null ? this.head : findLeafNode(key); int index = binarySearch(leafNode.entries, leafNode.curNum, key, null); if (index < 0) { System.out.println("Invalid: The key does not exist in the B+ tree!"); } else { leafNode.deleteAtIndex(index); // 删除后,叶子节点仍然满足要求,删除结束 if (leafNode.isValid()) { LeafNode sibling; InternalNode parent = leafNode.parent; // 删除后,叶子节点不满足要求,左兄弟节点可以移动一个索引项到当前叶子节点 if (leafNode.leftSibling != null && leafNode.leftSibling.parent == parent && leafNode.leftSibling.isAvailable()) { sibling = leafNode.leftSibling; Entry entry = sibling.entries[sibling.curNum - 1]; leafNode.insert(entry); sortEntries(leafNode.entries); sibling.deleteAtIndex(sibling.curNum - 1); // 更新 parent 的 key int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode); if (entry.key < parent.keys[pointIndex - 1]) { parent.keys[pointIndex - 1] = entry.key; } } // 删除后,叶子节点不满足要求,右兄弟节点可以移动一个索引项到当前叶子节点 else if (leafNode.rightSibling != null && leafNode.rightSibling.parent == parent && leafNode.rightSibling.isAvailable()) { sibling = leafNode.rightSibling; Entry entry = sibling.entries[0]; leafNode.insert(entry); sortEntries(leafNode.entries); sibling.deleteAtIndex(0); // 更新 parent 的 key int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode); if (entry.key > parent.keys[pointIndex]) { parent.keys[pointIndex] = entry.key; } } // 删除后,叶子节点不满足要求,左兄弟节点可以与当前叶子节点合并 else if (leafNode.leftSibling != null && leafNode.leftSibling.parent == parent && leafNode.leftSibling.isMergeable()) { sibling = leafNode.leftSibling; int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode); parent.removeKey(pointIndex - 1); parent.removePointer(leafNode); sibling.rightSibling = leafNode.rightSibling; if (parent.isValid()) { handleDeficiency(parent); } } // 删除后,叶子节点不满足要求,右兄弟节点可以与当前叶子节点合并 else if (leafNode.rightSibling != null && leafNode.rightSibling.parent == parent && leafNode.rightSibling.isMergeable()) { sibling = leafNode.rightSibling; int pointIndex = getIndexOfLeafNode(parent.childPointers, leafNode); parent.removeKey(pointIndex); parent.removePointer(leafNode); sibling.leftSibling = leafNode.leftSibling; if (sibling.leftSibling == null) { this.head = sibling; } // 逐级向上层判断是否满足要求 if (parent.isValid()) { handleDeficiency(parent); } } // 删除后,B+树为空 else if (this.root == null && this.head.curNum == 0) { this.head = null; } } } }}/** * 处理不满足要求的内部节点 * @param internalNode */private void handleInvalidInternalNode(InternalNode internalNode) { InternalNode sibling; InternalNode parent = internalNode.parent; // 当前内部节点为根节点 if (root == internalNode) { for (int i = 0; i < internalNode.childPointers.length; i++) { if (internalNode.childPointers[i] != null) { if (internalNode.childPointers[i] instanceof InternalNode) { root = (InternalNode) internalNode.childPointers[i]; root.parent = null; } else if (internalNode.childPointers[i] instanceof LeafNode) { root = null; } } } } // 左兄弟节点可以移动索引项 else if (internalNode.leftSibling != null && internalNode.leftSibling.isAvailable()) { sibling = internalNode.leftSibling; Integer key = sibling.keys[internalNode.curNodesNum - 2]; Node pointer = sibling.childPointers[internalNode.curNodesNum - 1]; shiftKeys(internalNode.keys, 1); shiftPointers(internalNode.childPointers, 1); internalNode.keys[0] = key; internalNode.childPointers[0] = pointer; sibling.removePointer(pointer); } // 右兄弟节点可以移动索引项 else if (internalNode.rightSibling != null && internalNode.rightSibling.isAvailable()) { sibling = internalNode.rightSibling; Integer key = sibling.keys[0]; Node pointer = sibling.childPointers[0]; internalNode.keys[internalNode.curNodesNum - 1] = parent.keys[0]; internalNode.childPointers[internalNode.curNodesNum] = pointer; parent.keys[0] = key; sibling.removePointer(0); shiftPointers(sibling.childPointers, -1); } // 左兄弟节点可以合并 else if (internalNode.leftSibling != null && internalNode.leftSibling.isMergeable()) { sibling = internalNode.leftSibling; int index = -1; for (int i = 0; i < parent.childPointers.length; i++) { if (parent.childPointers[i] == internalNode) { index = i; break; } } parent.keys[index - 1] = parent.keys[index]; for (int i = index; i < parent.keys.length - 1; i++) { parent.keys[i] = parent.keys[i + 1]; } shiftPointers(internalNode.childPointers, (int) Math.ceil(m / 2.0)); for (int i = 0; i < (int) Math.ceil(m / 2.0); i++) { internalNode.childPointers[i] = sibling.childPointers[i]; } internalNode.leftSibling = sibling.leftSibling; if (internalNode.leftSibling != null) { internalNode.leftSibling.rightSibling = internalNode; } if (parent != null && parent.isValid()) { handleInvalidInternalNode(parent); } } // 右兄弟节点可以合并 else if (internalNode.rightSibling != null && internalNode.rightSibling.isMergeable()) { sibling = internalNode.rightSibling; int index = -1; for (int i = 0; i < parent.childPointers.length; i++) { if (internalNode == parent.childPointers[i]) { index = i; break; } } parent.keys[index] = parent.keys[index + 1]; for (int i = index + 2; i < parent.keys.length; i++) { parent.keys[i - 1] = parent.keys[i]; } for (int i = 0; i < (int) Math.ceil(m / 2.0); i++) { internalNode.childPointers[internalNode.curNodesNum++] = sibling.childPointers[i]; } internalNode.rightSibling = sibling.rightSibling; if (internalNode.rightSibling != null) { internalNode.rightSibling.leftSibling = internalNode; } if (parent != null && parent.isValid()) { handleInvalidInternalNode(parent); } }}
参考文章:
1. B+树
关键词: