Contents

全局唯一ID服务化方案

一、背景

目前 snowflake 大ID的代码,通过库调用的方式在业务系统使用。这种调用方式是最快、最容易实现的。但同时存在缺点,因为是耦合在业务代码中,因此在适配多地区或者部署多个服务的情况下,因为服务不可动态配置原因,无法保证生成的 ID 的唯一性,也会导致代码的可维护性差

二、当前实现

1、snowflake ID 组成部分

	timestamp_bits   = 42  // 时间戳位数
	process_id_bits  = 9   // 进程ID位数
	sequence_id_bits = 12  // 序列号ID位数
  • 使用 42 bit 作为毫秒时间戳,表示范围 0 至 2^42 - 1
  • 使用 9 bit 表示进程ID
  • 使用 12 bit 表示同一毫秒内生成的序号ID(最多4096个) 以上三个加起来只有 63 bit,二进制中最高位为1的都是负数,因此没有使用 最终拼接的 snowflake ID 结构如下
<timestamp_bits:42bit><process_id_bits:9bit><sequence_id_bits:12bit>

2、生成ID算法

const (
	twepoch            = 1596364434706 // begin time 2020-08-02 18:33:54
	timestamp_bits     = 42            // 时间戳占 42 位, 如果需要表示时间更久,可延长位数
	datacenter_id_bits = 3             // 数据中心 id, 占 3 位,可表示 8 个数据中心
	process_id_bits    = 6             // 进程id(可理解为机器id), 占 6位,可表示 64 台机器
	sequence_id_bits   = 12            // 序列号,每次新增都自增 1
	max_timestamp      = 1 << timestamp_bits
	max_datacenter_id  = 1 << datacenter_id_bits
	max_process_id     = 1 << process_id_bits
	max_sequence_id    = 1 << sequence_id_bits
)

func make_snowflake(timestamp_ms int64, process_id int64, sequence_id int64) int64 {
	snowflakeid := ((timestamp_ms - twepoch) % max_timestamp) << process_id_bits << sequence_id_bits
	snowflakeid += (process_id % max_process_id) << sequence_id_bits
	snowflakeid += sequence_id % max_sequence_id
	return snowflakeid
}
  • 第一行,根据当前毫秒时间戳 - 起始毫秒时间戳,再通过 % 求余操作,保证不会超过最大时间戳,由于时间戳在高位,因此需要左移 process_id_bits + sequence_id_bits 位数,才能将时间戳放置到指定的 bit 位
  • 第二行,将process_id 通过 % 求余,保证不会超过最大 process_id, process_id 是中间位置的组成部分,因此也需要左移 sequence_id_bits 位数,才能将 process_id 放置到指定bit 位,然后通过 += 加法累加
  • 第三行,sequence_id 在最低位,因此只需要 % 求余,保证不会超过最大的 sequence_id,然后累加即可

3、具体流程步骤

3.1 一个服务需要使用到 snowflake ID,就需要在服务启动时,先初始化一个snowflake实例

func Init() {
	snowflakeetcd.Dialog()
	cnf := settings.Etcd()
	snowflakeetcd.Setup(cnf.Hosts)
	quesNameSpace := getQuestionSnowflakeNameSpace()
	questionSnowflake = snowflakeetcd.GetGenerator(quesNameSpace)
}

需要设置 namespace 命名空间做区分使用,如果有多个业务场景使用到大ID,并且两者之间没有关联的话,就可以通过指定 namespace 方式,实现不同ID段的管理,避免生成了重复的ID

3.2 注册 node 到 etcd 步骤

到 etcd 查询所有的 node 节点(一个节点表示一个服务的启用,如果一个服务实例化多个 snowflake 实例,那么就会有多个节点),下面是 etcd 的目录层级结构

/_snowflake_/nodes/<自定义>/1
<node_path>/<namespace>.<env>/<pid>

其中 pid 的生成有一定的逻辑判断,如果当前 node 目录下已经有节点

  • 从 etcd 取到最大的 max_pid, 如果 < max_process_id - 1 的话,那么新增的 pid = max_id += 1
  • 如果 max_id >= max_process_id - 1,会有一定的判断逻辑,具体见代码,不展开讲解

最后将实例化完成的 snowflake 放入 map 存储,避免多次实例化操作

3.3 生成大 ID 细节

func (sf *SnowFlake) gen() int64 {
	sf.lock.Lock()
	defer sf.lock.Unlock()
	for {
		ts := make_timestamp()
		seq := sf.getSequnce()
		if ts == sf.last_ts && seq <= sf.last_seq {
			time.Sleep(1 * time.Millisecond)
		} else {
			sid := make_snowflake(ts, sf.process_id, seq)
			sf.last_ts = ts
			sf.last_seq = seq
			return sid
		}
	}
}
  • 生成ID前先加锁
  • 生成新的序列号,sf.getSequnce(), 每次都是自增1
  • 如果当前生成的 ID 的时间戳与上一次生成的ID时间戳是一样,并且序列号和上一次的序列号是一样的,直接sleep 1 毫秒,避免生成重复的ID
  • 生成 ID 成功后,需要设置 last_ts 和 last_seq

三、大ID生成器新方案

新的大ID生成器方案会对之前 golang 的 snowflake 库进行改造,支持多地区生成唯一的 ID

3.1 改造 snowflake ID 的结构

<timestamp_bits:42bit><datacenter_id_bit:3bit><process_id_bit:6bit><sequence_id_bits:12bit>
  • 将之前的 process_id 字段拆分为两部分,3bit 表示数据中心ID,剩下的 6 bit 表示节点ID

3.2 snowflake 配置改造

之前 snowflake 不支持通过配置文件来配置相关信息,比如支持通过配置文件传递 datacenter_id 来保存多地区的使用,需要配置文件需要配置的格式

# 数据中心ID只有 3 bit表示范围需要限制在 [0, 1, 2, 3, 4, 5, 6, 7]
SNOWFLAKE_CONFIG:
  datacenter_id: 1
  datacenter: "cn" 
  • 由于 datacenter_id 只有 3 bit,因此在多地区部署时,需要修改相应的 id, 可供选择的 id 范围是 [0, 2^3-1], 也就是 [0, 1, 2, 3, 4, 5, 6, 7], 每个值表示一个数据中心
  • datacenter 尽量填写相关地区的英文简称,这个字段会在etcd 目录做数据中心的区分路径

3.3 初始化 snowflake 实例

	dataCenter := "sg"
	dataCenterID := int64(1)
	QuestionSnowflake = snowflakeetcd.GetGenerator(quesNameSpace, dataCenter, dataCenterID)

需要多传入 datacenter 和 datacenter_id 参数,作为不同地区初始化 snowflake 的初始条件

3.4 注册 etcd 的节点结构

/<自定义>/_snowflake_/nodes/<datacenter>/<namespace>.<env>/<process_id>

// 具体实现
/xxx/_snowflake_/nodes/cn/xxx.dev/1

3.5 snowflake id生成算法

func make_snowflake(timestamp_ms int64, datacenter_id, process_id int64, sequence_id int64) int64 {
	// 将时间戳左移到指定高位,移动(datacenter_id_bits + datacenter_id_bits + sequence_id_bits) 位
	snowflakeid := ((timestamp_ms - twepoch) % max_timestamp) << datacenter_id_bits << process_id_bits << sequence_id_bits
	// 将数据中心 ID 移动 process_id_bits + sequence_id_bits 位
	snowflakeid += (datacenter_id % max_datacenter_id) << process_id_bits << sequence_id_bits
	// 将 process_id 移动 sequence_id_bits 位
	snowflakeid += (process_id % max_process_id) << sequence_id_bits
	// 序列化在低位,不需要移位操作
	snowflakeid += sequence_id % max_sequence_id
	return snowflakeid
}

3.6 snowflake id 服务

snowflake 单独部署为一个服务,目前通过 http 协议提供生成大ID的接口,暂时没有使用 rpc 协议 该服务只提供一个接口,需要提供传参namespace, 1 表示 xxx, 2 表示 yyy

// POST: /api/snowflake/v1/gen/
// Request: 
{
    "namespace": 1
}
// Response:
{
    "message": "success",
    "code": 0,
    "data": {
        "snowflake_id": 107209837483659272,
        "datacenter_id": 1,
    }
}

3.7 访问 snowflake 服务

因为 snowflake 通过容器部署在k8s上,因此需要通过域名进行访问服务

  • 如果是腾讯云集群的话,可以通过 k8s service 提供的域名进行调用
  • 如果后期在内部 的 k8s 集群也部署同样的服务,需要申请内部域名进行调用(具体细节待定)

四、大ID服务无法提供服务的降级方案

考虑到 大 ID 服务是一个单独服务,有可能因为集群网络、服务故障等原因,导致该服务不可用,进而影响到正常的业务处理

降级步骤

  • 服务启动时,同时实例化本地的 snowflake 生成器,该生成器没有使用 datacenter_id 字段,而是将 9 个 bit 位全部给 process_id ,并且在启动时,随机生成一个 process_id
  • 请求 snowflake 服务超时 3s, 那么直接调用本地生成器生成 snowflake id(暂时不考虑本地生成的ID出现重复情况)
  • 如果一个调用方服务启用了 20个 pod, 那么会使用到 20 个随机 process_id (目前确认Python侧代码在服务启动时,并没有实例化 snowflake)

存在问题:

  • 因为每个服务都会有多个pod, 启动服务的时候初始化snowflake, 会占用很多 process_id 可以业务代码在snowflake 访问请求失败后,再尝试实例化本地snowflake, 不过这还得保证这个实例化是单例的

  • 本地启用 snowflake 生成器,写死了不使用 etcd, 如果需要启用etcd,就需要改代码 可以通过开关方式来设置是否启用 etcd,snowflake的基础代码支持对 etcd 的开关即可

五、新方案问题汇总

1、使用 http 提供服务,可能会导致获取新ID变得很慢(几十ms,之前可能就几ms内)

解答: 先验证一下 http 服务的整个链路所花费的时长,后续 rpc 可作为优化项

2、如果 snowflake 访问量过大,是否可以像无状态服务一样,对该服务进行扩容并提供服务

解答:snowflake 服务启动都会实例化一个节点,只要保证节点数不超过 2^6 个数即可,当前服务只有两个服务需要大ID,理论上是够用的,但是要确保 snowflake 服务不会启动过多pod(2^6 / 2)

3、确保 旧服务与新服务生成的 ID 不出现重复

解答:因为当前 etcd 的 /snowflake/nodes/ 下的节点都添加了 datacenter 前缀,因此如果与旧方案的生成 ID 算法一起运行会导致生成的 ID 一样,因为可能会有相同的 process_id 会被两个服务所使用。因此需要确保新的snowflake 服务被调用生成新 ID 前,所有业务服务也相应调用新的 snowflake 服务来获取新ID

4、时钟回拨问题,确保 snowflake 服务的时钟不要出现回拨情况

目前来说,容器内的时钟都是通过

5、如何确保 datacenter_id 不会出现用错情况(比如cn地区使用了sg地区的标识)

目前来说,通过配置文件进行区别,或者通过环境变量来设置数据中心标识

6、如何验证生成的ID是属于正确的数据中心的

比如 snowflake_id:  108152875544481803
将该 snowflake_id 转换成二进制如下:
         <timestamp>                        <数据中心id>   <机器ID>     <序列号>
0 000000110000000001111000111011110100000010   001        000001      000000010000
可以计算出,数据中心id的十进制是 1

完整代码

type SnowFlake struct {
	sequence      int64
	last_ts       int64
	last_seq      int64
	datacenter_id int64
	process_id    int64
	lock          sync.RWMutex
}

func (sf *SnowFlake) getSequnce() int64 {
	sf.sequence += 1
	sf.sequence %= max_sequence_id
	return sf.sequence
}

func (sf *SnowFlake) gen() int64 {
	sf.lock.Lock()
	defer sf.lock.Unlock()
	for {
		ts := make_timestamp()
		seq := sf.getSequnce()
		if ts == sf.last_ts && seq <= sf.last_seq {
			time.Sleep(1 * time.Millisecond)
		} else {
			sid := make_snowflake(ts, sf.process_id, sf.datacenter_id, seq)
			sf.last_ts = ts
			sf.last_seq = seq
			return sid
		}
	}
}

type SnowFlakeCloud struct {
	zk_namespace string //show be unique for each id field
	lock_path    string
	node_path    string
	zk_lock      ZkLockInterface
	zk_util      ZkUtilInterface
	sf           *SnowFlake
	datacenterID int64
}

func (sfc *SnowFlakeCloud) ensure_paths(paths string) error {
	return sfc.zk_util.EnsurePaths(paths)
}

func (sfc *SnowFlakeCloud) Init() {
	log.Println("sf Init begin", use_etcd)
	if use_etcd {
		sfc.zk_util = _etcd_util
		_ = sfc.ensure_paths(sfc.lock_path)
		sfc.zk_lock = NewEtcdLock(sfc.lock_path)
		//_ = sfc.ensure_paths(sfc.node_path)
	} else {
		sfc.zk_util = _zk_util
		_ = sfc.ensure_paths(sfc.lock_path)
		sfc.zk_lock = NewZkLock(sfc.lock_path)
		_ = sfc.ensure_paths(sfc.node_path)
	}
	log.Println("sf Init success", sfc.lock_path, sfc.node_path)
}

func (sfc *SnowFlakeCloud) register() {
	err := sfc.zk_lock.Lock()
	if err != nil {
		log.Panicln("sf lock err", err)
	}
	nodes, err := sfc.zk_util.Children(sfc.node_path)
	if err != nil {
		log.Panicln("sf get children err", err)
	}
	log.Println("sf exist nodes", nodes)
	var pid int64
	pid = 1
	if len(nodes) > 0 {
		node_ids := make(int64arr, 0, len(nodes))
		for _, s := range nodes {
			i, _ := strconv.ParseInt(s, 10, 64)
			node_ids = append(node_ids, i)
		}
		sort.Sort(node_ids)
		log.Println("sf node_ids", node_ids)
		if node_ids[len(node_ids)-1] < max_process_id-1 {
			pid = node_ids[len(node_ids)-1] + 1
		} else {
			log.Println("node reach max", node_ids[len(node_ids)-1])
			for i := int64(0); i < node_ids[len(node_ids)-1]+1; i++ {
				log.Println("node id", i)
				if i+1 != node_ids[i] {
					pid = i + 1
					break
				} else if i == int64(len(node_ids))-1 {
					pid = i + 2
					break
				} else if node_ids[i] != node_ids[i+1]-1 {
					pid = node_ids[i] + 1
					break
				}
			}
		}
	}
	log.Println("zksnowflake process_id", pid)
	pid_str := strconv.FormatInt(pid, 10)
	_, err = sfc.zk_util.CreateFlagEphemeral(sfc.node_path+"/"+pid_str, []byte(pid_str))
	if err != nil {
		log.Panicln("sf Create Ephemeral key err", sfc.node_path, err)
	}
	log.Println("sf Create Ephemeral key success", sfc.node_path, pid)
	sfc.sf = &SnowFlake{process_id: pid, datacenter_id: sfc.datacenterID}
	//
	err = sfc.zk_lock.Unlock()
	if err != nil {
		log.Panicln("sf unlock err", err)
	}
	log.Println("sf Create Ephemeral key ok", sfc.node_path, pid)
}

func (sfc *SnowFlakeCloud) Gen() int64 {
	return sfc.sf.gen()
}