Featured image of post 百度 UidGenerator 源码解析

百度 UidGenerator 源码解析

简介先来看一下官方介绍:雪花算法“雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成

简介

先来看一下官方介绍:

雪花算法

雪花算法(Snowflake)是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs 或 snowflakes。这种算法由 Twitter 创建,并用于推文的 ID。Discord 和 Instagram 等其他公司采用了修改后的版本。一个 Snowflake ID 有 64 位。前 41 位是时间戳,表示了自选定的时期以来的毫秒数。接下来的 10 位代表计算机 ID,防止冲突。其余 12 位代表每台机器上生成 ID 的序列号,这允许在同一毫秒内创建多个 Snowflake ID。SnowflakeID 基于时间生成,故可以按时间排序。此外,一个 ID 的生成时间可以由其自身推断出来,反之亦然。该特性可以用于按时间筛选 ID,以及与之联系的对象。

Image

第 1 位

该位不用主要是为了保持 ID 的自增特性,若使用了最高位,int64 会表示为负数。在 Java 中由于 long 类型的最高位是符号位,正数是 0,负数是 1,一般生成的 ID 为正整数,所以最高位为 0

41 位时间戳

毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始;

41 bit 可以表示的数字多达 2^41 - 1,也就是可以标识 2 ^ 41 - 1 个毫秒值,换算成年就是表示 69 年的时间。

(1L « 41) / (1000L 60 60 24 365) = (2199023255552 / 31536000000) ≈ 69.73 年。

10 位工作机器 ID

Twitter 实现中使用前 5 位作为数据中心标识,后 5 位作为机器标识,可以部署 1024 (2^10)个节点。意思就是最多代表 2 ^ 5 个机房(32 个机房),每个机房里可以代表 2 ^ 5 个机器(32 台机器)。具体的分区可以根据自己的需要定义。比如拿出 4 位标识业务号,其他 6 位作为机器号。

12 位序列号

支持同一毫秒内同一个节点可以生成 4096 (2^12)个 ID,也就是同一毫秒内同一台机器所生成的最大 ID 数量为 4096。

简单来说,你的某个服务假设要生成一个全局唯一 id,那么就可以发送一个请求给部署了 SnowFlake 算法的系统,由这个 SnowFlake 算法系统来生成唯一 id。这个 SnowFlake 算法系统首先肯定是知道自己所在的机器号,(这里姑且讲 10bit 全部作为工作机器 ID)接着 SnowFlake 算法系统接收到这个请求之后,首先就会用二进制位运算的方式生成一个 64 bit 的 long 型 id,64 个 bit 中的第一个 bit 是无意义的。接着用当前时间戳(单位到毫秒)占用 41 个 bit,然后接着 10 个 bit 设置机器 id。最后再判断一下,当前这台机房的这台机器上这一毫秒内,这是第几个请求,给这次生成 id 的请求累加一个序号,作为最后的 12 个 bit。

优点:

  • 理论上 Snowflake 方案的 QPS 约为 409.6w/s(1000 * 2^12),这种分配方式可以保证在任何一个 IDC 的任何一台机器在任意毫秒内生成的 ID 都是不同的。

缺点

  • 强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。

UidGenerator

UidGenerator 是 Java 实现的,基于 Snowflake 算法的唯一 ID 生成器。UidGenerator 以组件形式工作在应用项目中,支持自定义 workerId 位数和初始化策略,从而适用于 docker 等虚拟化环境下实例自动重启、漂移等场景。在实现上,UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制;采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费,同时对 CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题。最终单机 QPS 可达 600 万。

Image

UidGenerator 的实现跟 SnowFlake 原始算法不太一样,不过以下参数均可通过 Spring 进行自定义:

  • sign(1bit) 固定 1bit 符号标识,即生成的 UID 为正数。
  • delta seconds (28 bits) 当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约 8.7 年
  • worker id (22 bits) 机器 id,最多可支持约 420w 次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。
  • sequence (13 bits) 每秒下的并发序列,13 bits 可支持每秒 8192 个并发。

RingBuffer 环形数组,数组每个元素成为一个 slot。RingBuffer 容量,默认为 Snowflake 算法中 sequence 最大值,且为 2^N。可通过 boostPower 配置进行扩容,以提高 RingBuffer 读写吞吐量。

Tail 指针、Cursor 指针用于环形数组上读写 slot:

  • Tail 指针 表示 Producer 生产的最大序号(此序号从 0 开始,持续递增)。Tail 不能超过 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy

  • Cursor 指针 表示 Consumer 消费到的最小序号(序号序列与 Producer 序列相同)。Cursor 不能超过 Tail,即不能消费未生产的 slot。当 Cursor 已赶上 tail,此时可通过 rejectedTakeBufferHandler 指定 TakeRejectPolicy

CachedUidGenerator 采用了双 RingBuffer,Uid-RingBuffer 用于存储 Uid、Flag-RingBuffer 用于存储 Uid 状态 (是否可填充、是否可消费)

Image

由于数组元素在内存中是连续分配的,可最大程度利用 CPU cache 以提升性能。但同时会带来「伪共享」FalseSharing 问题,为此在 Tail、Cursor 指针、Flag-RingBuffer 中采用了 CacheLine 补齐方式。

Image

关于更多伪共享的知识,可以参考:https://www.cnblogs.com/cyfonly/p/5800758.html,

总结来说,伪共享会导致性能问题,解决了能提升性能,就算不解决也不会出现数据不一致等严重的问题。

RingBuffer 填充时机

  • 初始化预填充 RingBuffer 初始化时,预先填充满整个 RingBuffer.

  • 即时填充 Take 消费时,即时检查剩余可用 slot 量 (tail - cursor),如小于设定阈值,则补全空闲 slots。阈值可通过 paddingFactor 来进行配置

  • 周期填充 通过 Schedule 线程,定时补全空闲 slots。可通过 scheduleInterval 配置,以应用定时填充功能,并指定 Schedule 时间间隔

源码分析

概况

整个项目共 2386 行 java 代码 Image

代码内部 class 的依赖结构是这样的:

Image

可见 RingBuffer 是个核心类。

目录结构

 1  com
 2    └── baidu
 3        └── fsg
 4            └── uid
 5                ├── BitsAllocator.java                  - Bit 分配器 (C)
 6                ├── UidGenerator.java                   - UID 生成的接口 (I)
 7                ├── buffer
 8                │   ├── BufferPaddingExecutor.java      - 填充 RingBuffer 的执行器 (C)
 9                │   ├── BufferedUidProvider.java        - RingBuffer 中 UID 的提供者 (C)
10                │   ├── RejectedPutBufferHandler.java   - 拒绝 Put 到 RingBuffer 的处理器 (C)
11                │   ├── RejectedTakeBufferHandler.java  - 拒绝从 RingBuffer 中 Take 的处理器 (C)
12                │   └── RingBuffer.java                 - 内含两个环形数组 (C)
13                ├── exception
14                │   └── UidGenerateException.java       - 运行时异常
15                ├── impl
16                │   ├── CachedUidGenerator.java         - RingBuffer 存储的 UID 生成器 (C)
17                │   └── DefaultUidGenerator.java        - 无 RingBuffer 的默认 UID 生成器 (C)
18                ├── utils
19                │   ├── DateUtils.java
20                │   ├── DockerUtils.java
21                │   ├── EnumUtils.java
22                │   ├── NamingThreadFactory.java
23                │   ├── NetUtils.java
24                │   ├── PaddedAtomicLong.java
25                │   └── ValuedEnum.java
26                └── worker
27                    ├── DisposableWorkerIdAssigner.java  - 用完即弃的 WorkerId 分配器 (C)
28                    ├── WorkerIdAssigner.java            - WorkerId 分配器 (I)
29                    ├── WorkerNodeType.java              - 工作节点类型 (E)
30                    ├── dao
31                    │   └── WorkerNodeDAO.java           - MyBatis Mapper
32                    └── entity
33                        └── WorkerNodeEntity.java        - MyBatis Entity

DefaultUidGenerator

UidGenerator 在应用中是以 Spring 组件的形式提供服务,DefaultUidGenerator 提供了最简单的 Snowflake 式的生成模式,没有使用任何缓存来预存 UID,在需要生成 ID 的时候即时进行计算。

所以我们结合源码来串一下最简单的默认生成模式的流程。

首先引入DefaultUidGenerator配置

 1<!-- DefaultUidGenerator -->
 2<bean id="defaultUidGenerator" class="com.baidu.fsg.uid.impl.DefaultUidGenerator" lazy-init="false">
 3    <property name="workerIdAssigner" ref="disposableWorkerIdAssigner"/>
 4
 5    <!--当前时间位数,前文图上是 28 位,这里设置 29 位 -->
 6    <property name="timeBits" value="29"/>
 7    <!-- 机器 id 位数,设置 21 位 -->
 8    <property name="workerBits" value="21"/>
 9    <!-- 每秒下的并发序列位数,13 bits 可支持每秒 8192 个并发 -->
10    <property name="seqBits" value="13"/>
11    <!-- 相对于时间基点"2016-09-20"的增量值,单位是秒 -->
12    <!-- 用于计算时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始-->
13    <property name="epochStr" value="2016-09-20"/>
14</bean>
15 
16<!-- 用完即弃的 WorkerIdAssigner,依赖 DB 操作 -->
17<bean id="disposableWorkerIdAssigner" class="com.baidu.fsg.uid.worker.DisposableWorkerIdAssigner" />

配置文件中的配置我都作了注释,这里重点说两个属性:

epochStr

是给一个过去时间的字符串,作为时间基点,比如"2016-09-20",用于计算时间戳的差值(当前时间减去固定的开始时间),这样可以使产生的 ID 从更小值开始。这点从以下的两段源码可以看出来:

 1public void setEpochStr(String epochStr) {
 2    if (StringUtils.isNotBlank(epochStr)) {
 3        this.epochStr = epochStr;
 4        this.epochSeconds = TimeUnit.MILLISECONDS.toSeconds(DateUtils.parseByDayPattern(epochStr).getTime());
 5    }
 6}
 7 /**
 8  * Get current second
 9  */
10private long getCurrentSecond() {
11    long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
12    if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
13        throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
14    }
15
16    return currentSecond;
17}

disposableWorkerIdAssigner

Worker ID 分配器,用于为每个工作机器分配一个唯一的 ID,目前来说是用完即弃,在初始化 Bean 的时候会自动向 MySQL 中插入一条关于该服务的启动信息,待 MySQL 返回其自增 ID 之后,使用该 ID 作为工作机器 ID 并柔和到 UID 的生成当中。

 1@Transactional
 2public long assignWorkerId() {
 3    // build worker node entity
 4    WorkerNodeEntity workerNodeEntity = buildWorkerNode();
 5
 6    // add worker node for new (ignore the same IP + PORT)
 7    workerNodeDAO.addWorkerNode(workerNodeEntity);
 8    LOGGER.info("Add worker node:" + workerNodeEntity);
 9
10    return workerNodeEntity.getId();
11}

buildWorkerNode() 为获取该启动服务的信息,兼容 Docker 服务。但要注意,无论是 docker 还是用 k8s,需要添加相关的环境变量 env 在配置文件中以便程序能够获取到。

1 /** Environment param keys 主要是端口和 host */
2    private static final String ENV_KEY_HOST = "JPAAS_HOST";
3    private static final String ENV_KEY_PORT = "JPAAS_HTTP_PORT";

核心方法

介绍完上面这些,我们来看下 defaultUidGenerator 生成 ID 的核心方法(注意这个方法是同步方法)

 1 protected synchronized long nextId() {
 2
 3    long currentSecond = getCurrentSecond();
 4
 5    // 时钟向后移动,拒绝生成 id (解决时钟回拨问题)
 6    if (currentSecond < lastSecond) {
 7        long refusedSeconds = lastSecond - currentSecond;
 8        throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
 9    }
10
11    // 如果是在同一秒内,那么增加 sequence
12    if (currentSecond == lastSecond) {
13        sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
14        // 如果超过了最大值,那么需要等到下一秒再进行生成
15        if (sequence == 0) {
16            currentSecond = getNextSecond(lastSecond);
17        }
18
19    // 不同秒的情况下,sequence 重新从 0 开始计数
20    } else {
21        sequence = 0L;
22    }
23    lastSecond = currentSecond;
24    // Allocate bits for UID
25    return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
26}

可以看到大部分代码用来处理异常情况,比如时钟回拨问题,这里的做法比较简单,就是直接抛出异常。

最后一行才是根据传入的或计算好的参数进行 ID 的真正分配,通过二进制的移位和或运算得到最终的 long ID 值。

1 public long allocate(long deltaSeconds, long workerId, long sequence) {
2
3    return (deltaSeconds << timestampShift) | (workerId << workerIdShift) | sequence;
4}

CachedUidGenerator

CachedUidGenerator 是一个使用 RingBuffer 预先缓存 UID 的生成器,在初始化时就会填充整个 RingBuffer,并在 take() 时检测到少于指定的填充阈值之后就会异步地再次填充 RingBuffer(默认值为 50%),另外可以启动一个定时器周期性检测阈值并及时进行填充。

RingBuffer

上文提到 RingBuffer 是预先缓存 UID 的生成器,我们先看下它的成员变量情况:

 1    /** 常量配置 */
 2    private static final int START_POINT = -1;
 3    private static final long CAN_PUT_FLAG = 0L;
 4    private static final long CAN_TAKE_FLAG = 1L;
 5    public static final int DEFAULT_PADDING_PERCENT = 50;
 6
 7    /** RingBuffer 的 slot 的大小,每个 slot 持有一个 UID */
 8    private final int bufferSize;
 9    private final long indexMask;
10    /** 存 UID 的数组 */
11    private final long[] slots;
12    /** 存放 UID 状态的数组(是否可读或者可写,或是否可填充、是否可消费) */
13    private final PaddedAtomicLong[] flags;
14
15    /** Tail: 要产生的最后位置序列 */
16    private final AtomicLong tail = new PaddedAtomicLong(START_POINT);
17    /** Cursor: 要消耗的当前位置序列 */
18    private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);
19
20    /** 触发填充缓冲区的阈值 */
21    private final int paddingThreshold; 
22    
23    /** 放置 缓冲区的拒绝策略 拒绝方式为打印日志 */
24    private RejectedPutBufferHandler rejectedPutHandler = this::discardPutBuffer;
25    /** 获取 缓冲区的拒绝策略 拒绝方式为抛出异常并打印日志 */
26    private RejectedTakeBufferHandler rejectedTakeHandler = this::exceptionRejectedTakeBuffer; 
27    
28    /** 填充缓冲区的执行者 */
29    private BufferPaddingExecutor bufferPaddingExecutor;

可以看到 RingBuffer 内部有两个环形数组,一个用来存放 UID,一个用来存放 UID 的状态,这两个数组的大小都是一样的,也就是 bufferSize

slots 用于存放 UID 的 long 类型数组,flags 的用于存放读写标识的 PaddedAtomicLong 类型数组。这什么用 PaddedAtomicLong?上文有提到过伪共享的概念,这里就是为了解决这个问题,如果对 伪共享 还不太理解的朋友可以看一下上文的参考链接理解一下。

简单讲,由于 slots 实质是属于多读少写的变量,所以使用原生类型的收益更高。而 flags 则是会频繁进行写操作,为了避免伪共享问题所以手工进行补齐。

RingBuffer 构造方法

 1
 2   /**
 3     * 具有缓冲区大小的构造函数,paddingFactor 默认为 {@value #DEFAULT_PADDING_PERCENT}
 4     * 
 5     * @param bufferSize 必须是正数和 2 的幂
 6     */
 7    public RingBuffer(int bufferSize) {
 8        this(bufferSize, DEFAULT_PADDING_PERCENT);
 9    }
10    
11    /**
12     * 具有缓冲区大小和填充因子的构造函数
13     * 
14     * @param bufferSize 必须是正数和 2 的幂
15     * @param paddingFactor (0 - 100) 中的百分比。当剩余可用的 UID 数量达到阈值时,将触发填充缓冲区
16     *        Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100,
17     *        当 tail-cursor<threshold 时将触发填充缓冲区
18     */
19    public RingBuffer(int bufferSize, int paddingFactor) {
20        // check buffer size is positive & a power of 2; padding factor in (0, 100)
21        Assert.isTrue(bufferSize > 0L, "RingBuffer size must be positive");
22        Assert.isTrue(Integer.bitCount(bufferSize) == 1, "RingBuffer size must be a power of 2");
23        Assert.isTrue(paddingFactor > 0 && paddingFactor < 100, "RingBuffer size must be positive");
24
25        this.bufferSize = bufferSize;
26        this.indexMask = bufferSize - 1;
27        this.slots = new long[bufferSize];
28        this.flags = initFlags(bufferSize);
29        
30        this.paddingThreshold = bufferSize * paddingFactor / 100;
31    }

bufferSize 的默认值 ,如果 sequence 是 13 位,那么默认最大值是 8192,且是支持扩容的。

触发填充缓冲区的阈值也是支持配置的,

1<!-- RingBuffer size 扩容参数,可提高 UID 生成的吞吐量。--> 
2<!-- 默认:3, 原 bufferSize=8192, 扩容后 bufferSize= 8192 << 3 = 65536 -->
3<property name="boostPower" value="3"></property>
4
5<!-- 指定何时向 RingBuffer 中填充 UID, 取值为百分比 (0, 100), 默认为 50 -->
6<!-- 举例:bufferSize=1024, paddingFactor=50 -> threshold=1024 * 50 / 100 = 512. -->
7<!-- 当环上可用 UID 数量 < 512 时,将自动对 RingBuffer 进行填充补全 -->
8<property name="paddingFactor" value="50"></property>

RingBuffer 的填充和获取

RingBuffer 的填充和获取操作是线程安全的,但是填充和获取操作的性能会受到 RingBuffer 的大小的影响,先来看下 put 操作:

 1 public synchronized boolean put(long uid) {
 2        long currentTail = tail.get();
 3        long currentCursor = cursor.get();
 4
 5        // 当 tail 追上了 cursor 时,表示 RingBuffer 满了,不能再放了
 6        // Tail 不能超过 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过 rejectedPutBufferHandler 指定 PutRejectPolicy
 7        long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
 8        if (distance == bufferSize - 1) {
 9            rejectedPutHandler.rejectPutBuffer(this, uid);
10            return false;
11        }
12
13        // 1. 预检查 flag 是否为 CAN_PUT_FLAG,首次 put 时,currentTail 为-1
14        int nextTailIndex = calSlotIndex(currentTail + 1);
15        if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
16            rejectedPutHandler.rejectPutBuffer(this, uid);
17            return false;
18        }
19        // 2. put UID in the next slot
20        slots[nextTailIndex] = uid;
21        // 3. update next slot' flag to CAN_TAKE_FLAG
22        flags[nextTailIndex].set(CAN_TAKE_FLAG);
23        // 4. publish tail with sequence increase by one 移动 tail
24        tail.incrementAndGet();
25
26        
27        // 上述操作的原子性由“synchronized”保证。换句话说
28        // take 操作不能消费我们刚刚放的 UID,直到 tail 移动 (tail.incrementAndGet()) 才可以
29        return true;
30    }

注意 put 方法是 synchronized。再来看下 take 方法:

UID 的读取是一个无锁的操作。在获取 UID 之前,还要检查是否达到了 padding 阈值,在另一个线程中会触发 padding buffer 操作,如果没有更多可用的 UID 可以获取,则应用指定的 RejectedTakeBufferHandler

 1 public long take() {
 2    // spin get next available cursor
 3    long currentCursor = cursor.get();
 4    // cursor 初始化为-1,现在 cursor 等于 tail,所以初始化时 nextCursor 为-1
 5    long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
 6
 7    // check for safety consideration, it never occurs
 8    // 初始化或者全部 UID 耗尽时 nextCursor == currentCursor
 9    Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
10
11    // 如果达到阈值,则以异步模式触发填充
12    long currentTail = tail.get();
13    if (currentTail - nextCursor < paddingThreshold) {
14        LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
15                nextCursor, currentTail - nextCursor);
16        bufferPaddingExecutor.asyncPadding();
17    }
18
19    // cursor 追上 tail ,意味着没有更多可用的 UID 可以获取
20    if (nextCursor == currentCursor) {
21        rejectedTakeHandler.rejectTakeBuffer(this);
22    }
23
24    // 1. check next slot flag is CAN_TAKE_FLAG
25    int nextCursorIndex = calSlotIndex(nextCursor);
26    // 这个位置必须要是可以 TAKE
27    Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
28
29    // 2. get UID from next slot
30    // 3. set next slot flag as CAN_PUT_FLAG.
31    long uid = slots[nextCursorIndex];
32    // 告知 flags 数组这个位置是可以被重用了
33    flags[nextCursorIndex].set(CAN_PUT_FLAG);
34
35    // 注意:步骤 2,3 不能互换。如果我们在获取 slot 的值之前设置 flag,生产者可能会用新的 UID 覆盖 slot,这可能会导致消费者在一个 ring 中  获取 UID 两次
36    return uid;
37 }

BufferPaddingExecutor

默认情况下,slots 被消费大于 50%的时候进行异步填充,这个填充由 BufferPaddingExecutor 所执行的,下面我们马上看看这个执行者的主要代码。

 1 /**
 2    * Padding buffer fill the slots until to catch the cursor
 3    * 该方法被即时填充和定期填充所调用
 4    */
 5public void paddingBuffer() {
 6    LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
 7
 8    // is still running
 9    // 这个是代表填充 executor 在执行,不是 RingBuffer 在执行。避免多个线程同时扩容。
10    if (!running.compareAndSet(false, true)) {
11        LOGGER.info("Padding buffer is still running. {}", ringBuffer);
12        return;
13    }
14
15    // fill the rest slots until to catch the cursor
16    boolean isFullRingBuffer = false;
17    while (!isFullRingBuffer) {
18        // 填充完指定 SECOND 里面的所有 UID,直至填满
19        List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
20        for (Long uid : uidList) {
21            isFullRingBuffer = !ringBuffer.put(uid);
22            if (isFullRingBuffer) {
23                break;
24            }
25        }
26    }
27
28    // not running now
29    running.compareAndSet(true, false);
30    LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
31}
  • 当线程池分发多条线程来执行填充任务的时候,成功抢夺运行状态的线程会真正执行对 RingBuffer 填充,直至全部填满,其他抢夺失败的线程将会直接返回。

  • 该类还提供定时填充功能,如果有设置开关则会生效,默认不会启用周期性填充

  • RIngBuffer 的填充时机有 3 个:CachedUidGenerator 时对 RIngBuffer 初始化、RIngBuffer#take() 时检测达到阈值和周期性填充(如果有打开)

使用 RingBuffer 的 UID 生成器

最后我们看一下利用 CachedUidGenerator 生成 UID 的代码,CachedUidGenerator 继承了 DefaultUidGenerator,实现了 UidGenerator 接口。

该类在应用中作为 Spring Bean 注入到各个组件中,主要作用是初始化 RingBuffer 和 BufferPaddingExecutor。最重要的方法为 BufferedUidProvider 的提供者,即 lambda 表达式中的 nextIdsForOneSecond(long) 方法。

 1 /**
 2     * Get the UIDs in the same specified second under the max sequence
 3     * 
 4     * @param currentSecond
 5     * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
 6     */
 7    protected List<Long> nextIdsForOneSecond(long currentSecond) {
 8        // Initialize result list size of (max sequence + 1)
 9        int listSize = (int) bitsAllocator.getMaxSequence() + 1;
10        List<Long> uidList = new ArrayList<>(listSize);
11
12        // Allocate the first sequence of the second, the others can be calculated with the offset
13        long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
14        for (int offset = 0; offset < listSize; offset++) {
15            uidList.add(firstSeqUid + offset);
16        }
17
18        return uidList;
19    }

获取 ID 是通过委托 RingBuffer 的 take() 方法达成的

1    @Override
2    public long getUID() {
3        try {
4            return ringBuffer.take();
5        } catch (Exception e) {
6            LOGGER.error("Generate unique id exception. ", e);
7            throw new UidGenerateException(e);
8        }
9    }

这里通过时序图再串一下 获取 id 的流程。

Image

几段位运算代码

判断是不是 2 的幂

利用 bitCount 函数,原理是计算参数传递的 int 值的二进制值有多少个 1,如果只有 1 个,则说明是 2 的幂,否则不是。

1Integer.bitCount(bufferSize) == 1

根据位数取最大值

1// initialize max value
2this.maxDeltaSeconds = ~(-1L << timestampBits);
3this.maxWorkerId = ~(-1L << workerIdBits);
4this.maxSequence = ~(-1L << sequenceBits);

比较有意思,是用负 1 的二进制先左移再取反,比如看一下 13 和-1 的二进制值就明白了:

1System.out.println(Long.toBinaryString(-1L));
2System.out.println(Long.toBinaryString(-1L << 13 ));
3System.out.println(Long.toBinaryString( ~(-1L << 13) ));
4
5输出
61111111111111111111111111111111111111111111111111111111111111111
71111111111111111111111111111111111111111111111111110000000000000
81111111111111

parse UID

与上面的有异曲同之处。

1 // parse UID
2long sequence = (uid << (totalBits - sequenceBits)) >>> (totalBits - sequenceBits);
3long workerId = (uid << (timestampBits + signBits)) >>> (totalBits - workerIdBits);
4long deltaSeconds = uid >>> (workerIdBits + sequenceBits);

参考

位旅人路过 次翻阅 初次见面