雪花算法


雪花算法

雪花算法(Snowflake)是分布式系统中生成全局唯一、有序递增ID 的经典方案,核心是通过 64 位 Long 型数据按位拆分字段,兼顾唯一性、有序性和高效性。

1. 核心结构(64 位 Long 型)

雪花算法的 ID 结构按位划分,不同实现可能微调字段长度,主流标准如下:

  • 第 1 位(符号位):固定为 0,确保 ID 为正数(Long 型符号位 0 表示正,1 表示负)。

  • 第 2-42 位(时间戳位):共 41 位,存储

相对于固定起始时间的毫秒数

  • 41 位可表示 2^41-1 毫秒,约 69 年,满足大部分系统生命周期需求。

  • 第 43-52 位(机器 ID 位):共 10 位,用于区分不同节点(服务器、进程)。

  • 10 位可支持 2^10=1024 个节点,适配中小型分布式集群。

  • 第 53-64 位(序列号位):共 12 位,记录同一毫秒内的生成顺序。

  • 12 位可支持每个节点每毫秒生成 2^12=4096 个 ID,应对高并发场景。


2. 工作原理

  1. 时间戳递增:ID 中的时间戳部分随系统时间毫秒级递增,保证 ID 整体有序。
  2. 机器 ID 唯一:提前为每个节点分配唯一的机器 ID(如通过配置文件、注册中心分配),避免跨节点 ID 冲突。
  3. 序列号循环:同一毫秒内多次生成 ID 时,序列号从 0 开始递增;跨毫秒时,序列号重置为 0。

3. 核心优势

  • 全局唯一:通过 “时间戳 + 机器 ID + 序列号” 三重维度确保无重复 ID。
  • 有序递增:时间戳在前,同一节点生成的 ID 按时间顺序递增,便于数据库索引优化。
  • 无中心依赖:无需集群协调(如数据库、Redis),本地生成 ID,性能极高(单节点每秒可生成百万级 ID)。
  • 占用空间小:64 位 Long 型存储,便于传输和存储。

4. 潜在局限与解决方案

  • 时钟回拨问题:若节点系统时间回调(如同步 NTP 时间),可能生成重复 ID。
  • 解决方案:检测到时钟回拨时,暂停 ID 生成直至时间追平,或使用上次生成的最大时间戳。
  • 机器 ID 分配:需提前规划机器 ID,避免重复分配。
  • 解决方案:通过配置中心统一分配,或结合节点 IP、端口计算生成。
  • 生命周期限制:41 位时间戳仅支持约 69 年,需提前规划起始时间或后期扩展字段。

5. 代码实现

import time
import threading

class Snowflake:
    """
    雪花算法生成器:生成64位全局唯一ID
    结构:1位符号位(0) + 41位时间戳 + 10位机器ID + 12位序列号
    """
    def __init__(self, machine_id: int, start_time: float = 1577808000000):
        """
        初始化雪花算法生成器
        :param machine_id: 机器ID(0-1023,10位最大支持1024台机器)
        :param start_time: 起始时间戳(毫秒),默认2020-01-01 00:00:00的毫秒级时间戳
        """
        # 校验机器ID合法性(10位最大为2^10-1=1023)
        if machine_id < 0 or machine_id > 1023:
            raise ValueError("机器ID必须在0-1023之间")

        self.machine_id = machine_id  # 机器ID
        self.start_time = start_time  # 起始时间戳(毫秒)

        # 位偏移量定义
        self.timestamp_bits = 41  # 时间戳位数
        self.machine_bits = 10    # 机器ID位数
        self.sequence_bits = 12   # 序列号位数

        # 最大取值计算(位运算:n位最大为2^n -1)
        self.max_sequence = (1 << self.sequence_bits) - 1  # 4095
        self.max_timestamp = (1 << self.timestamp_bits) - 1  # 2^41-1

        # 偏移量(用于拼接各部分)
        self.machine_shift = self.sequence_bits  # 机器ID需要左移12位
        self.timestamp_shift = self.machine_bits + self.sequence_bits  # 时间戳需要左移22位

        # 状态变量(记录上一次生成ID的信息)
        self.last_timestamp = 0  # 上一次时间戳
        self.sequence = 0  # 当前毫秒内的序列号

        # 线程锁(保证多线程环境下序列号安全)
        self.lock = threading.Lock()

    def _get_current_timestamp(self) -> int:
        """获取当前时间戳(毫秒)"""
        return int(time.time() * 1000)

    def generate_id(self) -> int:
        """生成一个雪花ID"""
        with self.lock:  # 加锁保证线程安全
            current_timestamp = self._get_current_timestamp()

            # 1. 处理时钟回拨(当前时间 < 上一次时间)
            if current_timestamp < self.last_timestamp:
                # 计算回拨毫秒数,这里选择等待时间追平(也可抛出异常)
                sleep_ms = self.last_timestamp - current_timestamp
                time.sleep(sleep_ms / 1000)  # 转换为秒
                current_timestamp = self._get_current_timestamp()  # 重新获取时间

                # 若仍回拨,说明时间异常,抛出错误
                if current_timestamp < self.last_timestamp:
                    raise RuntimeError(f"时钟回拨异常,回拨{self.last_timestamp - current_timestamp}毫秒")

            # 2. 处理同一毫秒内的序列号
            if current_timestamp == self.last_timestamp:
                self.sequence += 1
                # 序列号超出最大值(4095),等待到下一个毫秒
                if self.sequence > self.max_sequence:
                    # 循环等待至下一毫秒
                    while current_timestamp <= self.last_timestamp:
                        current_timestamp = self._get_current_timestamp()
                    self.sequence = 0  # 重置序列号
            else:
                # 跨毫秒,重置序列号
                self.sequence = 0

            # 3. 更新上一次时间戳
            self.last_timestamp = current_timestamp

            # 4. 计算相对时间戳(当前时间 - 起始时间)
            relative_timestamp = current_timestamp - self.start_time
            if relative_timestamp > self.max_timestamp:
                raise RuntimeError("雪花算法已过期(超过41位时间戳上限)")

            # 5. 拼接ID:时间戳 << 22 | 机器ID << 12 | 序列号
            snowflake_id = (
                (relative_timestamp << self.timestamp_shift)
                | (self.machine_id << self.machine_shift)
                | self.sequence
            )
            return snowflake_id

if __name__ == "__main__":
    # 初始化生成器(机器ID=1,使用默认起始时间)
    snowflake = Snowflake(machine_id=1)

    # 生成10个ID并打印
    for _ in range(10):
        sid = snowflake.generate_id()
        print(f"雪花ID: {sid}(长度:{len(str(sid))}位)")

    # 多线程测试(验证线程安全)
    def generate_in_thread():
        for _ in range(5):
            print(f"线程{threading.current_thread().name}生成ID: {snowflake.generate_id()}")

    threads = [threading.Thread(target=generate_in_thread, name=f"thread-{i}") for i in range(3)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

0 条评论

发表评论

暂无评论,欢迎发表您的观点!