流水号设计及Leaf的升级使用(流水号生成算法)

一、需求背景

由于业务需要对数据和消息等进行唯一的标识。需要进行唯一流水号的设计。

  1. 返回数据的流水号(requestId)
  2. 记录唯一日志数据流水号
  3. 发送的消息唯一流水号
  4. 数据库主键

二、常用方法介绍

2.1 UUID

UUID是标准形式包含32个16进制数字,以连字号分为5段,形式为a-b-c-d-e的36个字符,示例550e8400-e29b-41d4-a716-446655440000

优点:

性能非常高:本地生成,没有网络消耗

缺点:

不易于存储:UUID太长,16字节128位,通常以36长度的字符串标识,很多场景不适用

信息不安全:基于MAC地址生成UUID的算法可能会造成MAC地址泄露。

ID作为主键时不适用。

UUID无序,在InnoDB引擎下,UUID的无序性会引起数据位置频繁变动,严重影响性能。

2.2 类snowflake方案(雪花算法)

这种方案大致来说是一种以划分命名空间来生产ID的一种算法,这种方案把64位bit分别划分成多段,分开来表示机器、时间等。snowflake中的64-bit分别表示如下图(图片来自网络)所示:


优点:

毫秒数在高位,自增序列在地位,整个ID都是趋势递增的。

不依赖数据库等第三方系统,以服务的方式部署,稳定性高,生成ID的性能也非常高。

可以根据自身业务特性分配bit位,非常灵活。

缺点:

强依赖机器时钟,如果机器上时钟回拨,会导致发号重复或者服务会处于不可用状态。

雪花算法(Java)

/**
 * Twitter_Snowflake
* SnowFlake的结构如下(每部分用-分开):
* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000
* 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0
* 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截) * 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69
* 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId
* 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号
* 加起来刚好64位,为一个Long型。
* SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。 */ @Component public class SnowFlake { // ==============================Fields=========================================== /** 开始时间截 (2015-01-01) */ private final long twepoch = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")); /** 机器id所占的位数 */ private final long workerIdBits = 5L; /** 数据标识id所占的位数 */ private final long dataCenterIdBits = 5L; /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */ private final long maxWorkerId = -1L ^ (-1L << workerIdBits); /** 支持的最大数据标识id,结果是31 */ private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits); /** 序列在id中占的位数 */ private final long sequenceBits = 12L; /** 机器ID向左移12位 */ private final long workerIdShift = sequenceBits; /** 数据标识id向左移17位(12+5) */ private final long dataCenterIdShift = sequenceBits + workerIdBits; /** 时间截向左移22位(5+5+12) */ private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits; /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */ private final long sequenceMask = -1L ^ (-1L << sequenceBits); /** 工作机器ID(0~31) */ private long workerId; /** 数据中心ID(0~31) */ private long dataCenterId; /** 毫秒内序列(0~4095) */ private long sequence = 0L; /** 上次生成ID的时间截 */ private long lastTimestamp = -1L; private static SnowFlake idWorker; static { idWorker = new SnowFlake(getWorkId(),getDataCenterId()); } //==============================Constructors===================================== /** * 构造函数 * @param workerId 工作ID (0~31) * @param dataCenterId 数据中心ID (0~31) */ public SnowFlake(long workerId, long dataCenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId)); } if (dataCenterId > maxDataCenterId || dataCenterId < 0) { throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId)); } this.workerId = workerId; this.dataCenterId = dataCenterId; } // ==============================Methods========================================== /** * 获得下一个ID (该方法是线程安全的) * @return SnowflakeId */ public synchronized long nextId() { long timestamp = timeGen(); //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 if (timestamp < lastTimestamp) { throw new RuntimeException( String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); } //如果是同一时间生成的,则进行毫秒内序列 if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; //毫秒内序列溢出 if (sequence == 0) { //阻塞到下一个毫秒,获得新的时间戳 timestamp = tilNextMillis(lastTimestamp); } } //时间戳改变,毫秒内序列重置 else { sequence = 0L; } //上次生成ID的时间截 lastTimestamp = timestamp; //移位并通过或运算拼到一起组成64位的ID return ((timestamp - twepoch) << timestampLeftShift) | (dataCenterId << dataCenterIdShift) | (workerId << workerIdShift) | sequence; } /** * 阻塞到下一个毫秒,直到获得新的时间戳 * @param lastTimestamp 上次生成ID的时间截 * @return 当前时间戳 */ protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } /** * 返回以毫秒为单位的当前时间 * @return 当前时间(毫秒) */ protected long timeGen() { return System.currentTimeMillis(); } private static Long getWorkId(){ try { String hostAddress = Inet4Address.getLocalHost().getHostAddress(); int[] ints = StringUtils.toCodePoints(hostAddress); int sums = 0; for(int b : ints){ sums += b; } return (long)(sums % 32); } catch (UnknownHostException e) { // 如果获取失败,则使用随机数备用 return RandomUtils.nextLong(0,31); } } private static Long getDataCenterId(){ int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName()); int sums = 0; for (int i: ints) { sums += i; } return (long)(sums % 32); } /** * 静态工具类 */ public static Long generateId(){ long id = idWorker.nextId(); return id; } //==============================Test============================================= /** 测试 */ // public static void main(String[] args) { // System.out.println(System.currentTimeMillis()); // long startTime = System.nanoTime(); // for (int i = 0; i < 50000; i++) { // long id = SnowFlake.generateId(); // System.out.println(id); // } // System.out.println((System.nanoTime()-startTime)/1000000+"ms"); // } }

2.3 数据库生成

设置字段递增

以MySQL举例,利用给字段设置auto_increment_incrementauto_increment_offset来保证ID自增,每次业务使用下列SQL读写MySQL得到ID号。

begin;
    REPLACE INTO Tickets64 (stub) VALUES ('a');
    SELECT LAST_INSERT_ID();
commit;

优点:

非常简单,利用数据库系统功能实现,成本小。

ID号单调递增。

缺点:

强依赖DB,当DB异常时整个系统不可用,属于致命问题。配置主从复制可以尽可能地增加可用性,但是数据一致性在特殊情况下难以保证。主从切换时的不一致可能会导致重复发号。

ID发号性能瓶颈限制在单台MySQL的读写性能。

三、设计方案

3.1 Leaf

3.1.1 Leaf特性:

  • 全局唯一,绝对不会出现重复的id,且id整体趋势递增。
  • 高可用,服务完全基于分布式架构,及时MySQL宕机,也能容忍一段时间的数据库不可用
  • 高并发低延时,在CentOS 4C8G的虚拟机上,远程调用QPS可达5w+,TP99在1ms内
  • 接入简单,直接通过公司RPC服务或者HTTP调用接入即可。

3.1.2 Leaf 实现

美团开源的分布式ID生成器。分别对第二种nowflake和第三种数据库实现的方案进行了优化。实现了Leaf-segment和Leaf-snowFlake方案。

Leaf-segment数据库方案

原方案是每次获取ID都得读写一次数据库,造成的压力大。改为利用proxy server 批量获取,每次获取一个segment(step决定大小)号段的值。用完之后再去数据库获取新的号段,可以大大减轻数据库的压力。

各个业务不同的发号需要用biz_tag字段来区分,每个biz_tag的ID获取相互隔离,互不影响。如果以后又性能需求需要对数据库进行扩容,只需要对biz_tag分库分表就好了

数据库表设计如下

+-------------+--------------+------+-----+-------------------+-----------------------------+
| Field       | Type         | Null | Key | Default           | Extra                       |
+-------------+--------------+------+-----+-------------------+-----------------------------+
| biz_tag     | varchar(128) | NO   | PRI |                   |                             |
| max_id      | bigint(20)   | NO   |     | 1                 |                             |
| step        | int(11)      | NO   |     | NULL              |                             |
| desc        | varchar(256) | YES  |     | NULL              |                             |
| update_time | timestamp    | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------+--------------+------+-----+-------------------+-----------------------------+

重要字段说明:biz_tag用来区分业务,max_id表示该biz_tag目前所被分配的ID号段的最大值,step表示每次分配的号段长度。原来获取ID每次都需要写数据库,现在只需要把step设置得足够大,比如1000。那么只有当1000个号被消耗完了之后才会去重新读写一次数据库。读写数据库的频率从1减小到了1/step,大致架构如下图所示:


test_tag在第一台Leaf机器上是1~1000的号段,当这个号段用完时,会去加载另一个长度为step=1000的号段,假设另外两台号段都没有更新,这个时候第一台机器新加载的号段就应该是3001~4000。同时数据库对应的biz_tag这条数据的max_id会从3000被更新成4000

Leaf Server加载号段的SQL语句如下:

Begin
    UPDATE table SET max_id=max_id+step WHERE biz_tag=xxx
    SELECT tag, max_id, step FROM table WHERE biz_tag=xxx
Commit

这种模式的优缺点

优点:

  • Leaf服务可以很方便的线性扩展,性能完全能够支撑大多数业务场景。
  • ID号码是趋势递增的8byte的64位数字,满足上述数据库存储的主键要求。
  • 容灾性高:Leaf服务内部有号段缓存,即使DB宕机,短时间内Leaf仍能正常对外提供服务。
  • 可以自定义max_id的大小,非常方便业务从原有的ID方式上迁移过来。

缺点:

  • ID号码不够随机,能够泄露发号数量的信息,不太安全。
  • TP999数据波动大,当号段使用完之后还是会hang在更新数据库的I/O上,tg999数据会出现偶尔的尖刺。
  • DB宕机会造成整个系统不可用。

双buff优化

对于第二个缺点,Leaf-segment做了一些优化,

Leaf 取号段的时机是在号段消耗完的时候进行的,也就意味着号段临界点的ID下发时间取决于下一次从DB取回号段的时间,并且在这期间进来的请求也会因为DB号段没有取回来,导致线程阻塞。如果请求DB的网络和DB的性能稳定,这种情况对系统的影响是不大的,但是假如取DB的时候网络发生抖动,或者DB发生慢查询就会导致整个系统的响应时间变慢。

为此,我们希望DB取号段的过程能够做到无阻塞,不需要在DB取号段的时候阻塞请求线程,即当号段消费到某个点时就异步的把下一个号段加载到内存中。而不需要等到号段用尽的时候才去更新号段。这样做就可以很大程度上的降低系统的TP999指标。详细实现如下图所示:


采用双buffer的方式,Leaf服务内部有两个号段缓存区segment。当前号段已下发10%时,如果下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段全部下发完后,如果下个号段准备好了则切换到下个号段为当前segment接着下发,循环往复。

  • 每个biz-tag都有消费速度监控,通常推荐segment长度设置为服务高峰期发号QPS的600倍(10分钟),这样即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。
  • 每次请求来临时都会判断下个号段的状态,从而更新此号段,所以偶尔的网络抖动不会影响下个号段的更新。

Leaf高可用容灾

对于第三点“DB可用性”问题,我们目前采用一主两从的方式,同时分机房部署,Master和Slave之间采用半同步方式[5]同步数据。同时使用公司Atlas数据库中间件(已开源,改名为DBProxy)做主从切换。当然这种方案在一些情况会退化成异步模式,甚至在非常极端情况下仍然会造成数据不一致的情况,但是出现的概率非常小。如果你的系统要保证100%的数据强一致,可以选择使用“类Paxos算法”实现的强一致MySQL方案,如MySQL 5.7前段时间刚刚GA的MySQL Group Replication。但是运维成本和精力都会相应的增加,根据实际情况选型即可。


同时Leaf服务分IDC部署,内部的服务化框架是“MTthrift RPC”。服务调用的时候,根据负载均衡算法会优先调用同机房的Leaf服务。在该IDC内Leaf服务不可用的时候才会选择其他机房的Leaf服务。同时服务治理平台OCTO还提供了针对服务的过载保护、一键截流、动态流量分配等对服务的保护措施。

Leaf-snowflake方案

Leaf-segment方案可以生成趋势递增的ID,同时ID号是可计算的,不适用于订单ID生成场景,比如竞对在两天中午12点分别下单,通过订单id号相减就能大致计算出公司一天的订单量,这个是不能忍受的。面对这一问题,我们提供了 Leaf-snowflake方案。


Leaf-snowflake方案完全沿用snowflake方案的bit位设计,即是“1+41+10+12”的方式组装ID号。对于workerID的分配,当服务集群数量较小的情况下,完全可以手动配置。Leaf服务规模较大,动手配置成本太高。所以使用Zookeeper持久顺序节点的特性自动对snowflake节点配置wokerID。Leaf-snowflake是按照下面几个步骤启动的:

  1. 启动Leaf-snowflake服务,连接Zookeeper,在leaf_forever父节点下检查自己是否已经注册过(是否有该顺序子节点)。
  2. 如果有注册过直接取回自己的workerID(zk顺序节点生成的int类型ID号),启动服务。
  3. 如果没有注册过,就在该父节点下面创建一个持久顺序节点,创建成功后取回顺序号当做自己的workerID号,启动服务。


弱依赖ZooKeeper

除了每次会去ZK拿数据以外,也会在本机文件系统上缓存一个workerID文件。当ZooKeeper出现问题,恰好机器出现问题需要重启时,能保证服务能够正常启动。这样做到了对三方组件的弱依赖。一定程度上提高了SLA

解决时钟问题

因为这种方案依赖时间,如果机器的时钟发生了回拨,那么就会有可能生成重复的ID号,需要解决时钟回退的问题。


参见上图整个启动流程图,服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:

  1. 若写过,则用自身系统时间与leaf_forever/${self}节点记录时间做比较,若小于leaf_forever/${self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
  2. 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
  3. 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
  4. 否则认为本机系统时间发生大步长偏移,启动失败并报警。
  5. 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。

由于强依赖时钟,对时间的要求比较敏感,在机器工作时NTP同步也会造成秒级别的回退,建议可以直接关闭NTP同步。要么在时钟回拨的时候直接不提供服务直接返回ERROR_CODE,等时钟追上即可。或者做一层重试,然后上报报警系统,更或者是发现有时钟回拨之后自动摘除本身节点并报警,如下:

 //发生了回拨,此刻时间小于上次发号时间 
if (timestamp < lastTimestamp) {            
    long offset = lastTimestamp - timestamp;            
        if (offset <= 5) {                
            try {                    
                //时间偏差大小小于5ms,则等待两倍时间
                wait(offset << 1);//wait
                timestamp = timeGen();                    
                if (timestamp < lastTimestamp) {                       
                    //还是小于,抛异常并上报
                    throwClockBackwardsEx(timestamp);
                 }    
                } catch (InterruptedException e) {  
                    throw  e;
                }
             } else {                //throw
                throwClockBackwardsEx(timestamp);
             }  
} //分配ID

从上线情况来看,在2017年闰秒出现那一次出现过部分机器回拨,由于Leaf-snowflake的策略保证,成功避免了对业务造成的影响。

3.2 演示

源码下载地址

https://github.com/Meituan-Dianping/Leaf.git

使用方式

#segment

curl http://localhost:8080/api/segment/get/leaf-segment-test

#snowflake

curl http://localhost:8080/api/snowflake/get/test

使用sengment方式

使用sengment方式需要创建数据表,并且配置leaf.jdbc.url; leaf.jdbc.username; leaf.jdbc.password

如果不想使用该模式配置leaf.segment.enable=false即可。

CREATE DATABASE leaf
CREATE TABLE `leaf_alloc` (
  `biz_tag` varchar(128)  NOT NULL DEFAULT '',
  `max_id` bigint(20) NOT NULL DEFAULT '1',
  `step` int(11) NOT NULL,
  `description` varchar(256)  DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB;

insert into leaf_alloc(biz_tag, max_id, step, description) values('leaf-segment-test', 1, 2000, 'Test leaf Segment Mode Get Id')

使用号段模式监控页面

监控页面

号段模式:http://localhost:8080/cache

使用 http://localhost:8080/api/segment/get/leaf-segment-test 执行两次之后的监控数据,该页面默认一分钟刷新一次


Snowflake模式

算法取自twitter开源的snowflake算法。

如果不想使用该模式配置leaf.snowflake.enable=false即可。

配置zookeeper地址

在leaf.properties中配置leaf.snowflake.zk.address,配置leaf 服务监听的端口leaf.snowflake.port。

四、系统接入方式

上边可以直接用api的方式,以leaf单独部署项目模式使用,接下来使用一种spring-boot-starter方式

1:下载git源码

git clone https://github.com/Meituan-Dianping/Leaf.git

2:转到feature/spring-boot-starter分支

 git checkout feature/spring-boot-starter

3:进入leaf项目

cd leaf

4:编译安装

mvn clean install -Dmaven.test.skip=true

然后在项目的pom中引入


    leaf-boot-starter
    com.sankuai.inf.leaf
    1.0.1-RELEASE

在项目的启动类上加入

@EnableLeafServer

在需要的地方直接导入


至此,项目中就可以使用leaf分布式ID生成器了。

五、问题

在下载leaf源码之后,启动时出现问题。默认leaf是支持mysql5.1.18的,在使用mysql数据库版本为8以上时无法编译成功,需要在原有项目中加入驱动。,这样才能启动成功完成部署。


Leaf.properties


Constants


sengmentService


六、本地接入

将leaf作为本地服务供业务服务调取。需要对nacos进行一定的改造,第一步就是接入nacos,使leaf作为一个单独的服务部署到nacos上,以便提供feign和http服务调用。

6.1 pom.xml的修改

ps:注释的为原版本中自带的依赖。

leaf-parent/pom.xml



    4.0.0
    com.sankuai.inf.leaf
    leaf-parent
    pom
    1.0.1
    Leaf
    
        leaf-core
        leaf-server
    






















    
        UTF-8
        UTF-8


        3.5.1

        0.9.16
        2.6.0


        2.9.6
        8.0.13
        2.4


    
    
        
            
                com.sankuai.inf.leaf
                leaf-core
                1.0.1
            





            
            
                com.alibaba
                druid-spring-boot-starter
                1.2.1
            

            
            
                org.apache.curator
                curator-recipes
                ${curator.version}
                
                    
                        log4j
                        log4j
                    
                
            
            
                commons-io
                commons-io
                ${commons-io.version}
            






            
            
                org.mybatis.spring.boot
                mybatis-spring-boot-starter
                2.2.0
            

            
                org.apache.logging.log4j
                log4j-spring-cloud-config-client
            




















































            
                mysql
                mysql-connector-java
                ${mysql-connector-java.version}
            










        
    
    
        leaf
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                ${maven.compiler.version}
                
                    1.7
                    1.7
                
            
            
                org.jacoco
                jacoco-maven-plugin
                0.7.5.201505241946
                
                    
                        pre-unit-test
                        
                            prepare-agent
                        
                        
                            
                                ${project.build.directory}/${project.artifactId}-jacoco.exec
                            
                        
                    
                    
                        post-unit-test
                        test
                        
                            report
                        
                        
                            
                                ${project.build.directory}/${project.artifactId}-jacoco.exec
                            
                            ${project.build.directory}/jacoco
                        
                    
                
            
        
    


leaf-server/pom.xml



    4.0.0
    
        com.sankuai.inf.leaf
        leaf-parent
        1.0.1
    
    com.sankuai.inf.leaf
    leaf-server
    1.0.1
    jar
    leaf-server
    Leaf HTTP Server
    
        2.2.1.RELEASE
    
    
        
            
                org.springframework.boot
                spring-boot-dependencies
                ${spring-boot-dependencies.version}
                pom
                import
            
        
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
        
            org.springframework.boot
            spring-boot-starter-freemarker
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            com.sankuai.inf.leaf
            leaf-core
        


        
            com.alibaba
            druid
            1.0.18
        
        
        
            org.apache.curator
            curator-recipes
            
                
                    log4j
                    log4j
                
            
        
        
            junit
            junit
            test
        
        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-discovery
            2.2.1.RELEASE
        
        
            com.alibaba.cloud
            spring-cloud-starter-alibaba-nacos-config
            2.2.1.RELEASE
        




    
    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
                ${spring-boot-dependencies.version}
                
                    com.sankuai.inf.leaf.server.LeafServerApplication
                
                
                    
                        
                            repackage
                        
                    
                
            



















        
    

leaf.core/pom.xml


    4.0.0
    
        com.sankuai.inf.leaf
        leaf-parent
        1.0.1
    
    com.sankuai.inf.leaf
    leaf-core
    jar
    1.0.1
    leaf-core
    
        8.0.13
        2.4


    
    
        
            org.springframework.boot
            spring-boot-starter
            2.2.1.RELEASE
        
        
        
            org.mybatis.spring.boot
            mybatis-spring-boot-starter
            2.2.0
        























        
            commons-io
            commons-io
        
        
        
            org.apache.curator
            curator-recipes
            provided
        
        
            com.fasterxml.jackson.core
            jackson-databind
            ${jackson-databind.version}
            provided
        
















































        
            mysql
            mysql-connector-java
        












    

6.2 配置文件

新增两个配置文件application.yml 和 bootstrap.yml

application.yml

server:
  port: 8889
#  port: ${SERVER_PORT:9009}
spring:
  application:
    name: bond-platform-leaf-config
  cloud:
    nacos:
      discovery:
        server-addr: ${REGISTER_HOST:nacos.bondtechinuat.com}:${REGISTER_PORT:80}
        namespace: ${NAMESPACE:ba9dd0c4-edfd-4cd4-8e88-dfa9e7dcab7e}

      config:
        server-addr: ${REGISTER_HOST:nacos.bondtechinuat.com}:${REGISTER_PORT:80}
        namespace: ${NAMESPACE:ba9dd0c4-edfd-4cd4-8e88-dfa9e7dcab7e}
    #        shared-configs:
    #          - bond-platform-leaf-config.yml
    #        server-addr: nacos.bondtechinuat.com:80
    #        namespace: ba9dd0c4-edfd-4cd4-8e88-dfa9e7dcab7e
    config:
      enabled: false

application.yml


spring:
  freemarker:
    cache: false
    charset: UTF-8
    check-template-location: true
    content-type: text/html
    expose-request-attributes: true
    expose-session-attributes: true
    request-context-attribute: request
#  cloud:
#    sentinel:
#      transport:
#        dashboard: ${message.sentinel.dashboard.host}:${message.sentinel.dashboard.port}
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://10.0.3.28:3306/leaf?characterEncoding=UTF-8&useUnicode=true&useSSL=false
    username: root
    password: tF%$AFFG0bhzjIGz

6.3 代码报错

代码中因为把logjar包去掉,导致log会报错,直接注释掉报错行或者修改日志输出格式即可。

6.4、接口

6.4.1 调用segment接口生成id

GET /api/segment/get/{{key}}

6.4.2 调用snowflake接口生成id

GET /api/snowflake/get/{key}

七、源码分析

7.1、 segment方式调用(以master分支代码为主)

源码流程


7.2、源码

LeafController


@RequestMapping(value = "/api/segment/get/{key}")
public String getSegmentId(@PathVariable("key") String key) {
    return get(key, segmentService.getId(key));
}


SegmentService

package com.sankuai.inf.leaf.server.service;

import com.alibaba.druid.pool.DruidDataSource;
import com.sankuai.inf.leaf.IDGen;
import com.sankuai.inf.leaf.common.PropertyFactory;
import com.sankuai.inf.leaf.common.Result;
import com.sankuai.inf.leaf.common.ZeroIDGen;
import com.sankuai.inf.leaf.segment.SegmentIDGenImpl;
import com.sankuai.inf.leaf.segment.dao.IDAllocDao;
import com.sankuai.inf.leaf.segment.dao.impl.IDAllocDaoImpl;
import com.sankuai.inf.leaf.server.Constants;
import com.sankuai.inf.leaf.server.exception.InitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.sql.SQLException;
import java.util.Properties;

@Service("SegmentService")
public class SegmentService {
    private Logger logger = LoggerFactory.getLogger(SegmentService.class);

    private IDGen idGen;
    private DruidDataSource dataSource;

    public SegmentService() throws SQLException, InitException {
        Properties properties = PropertyFactory.getProperties();
        boolean flag = Boolean.parseBoolean(properties.getProperty(Constants.LEAF_SEGMENT_ENABLE, "true"));
        if (flag) {
            // Config dataSource
            dataSource = new DruidDataSource();
            dataSource.setUrl(properties.getProperty(Constants.LEAF_JDBC_URL));
            dataSource.setDriverClassName(properties.getProperty(Constants.LEAF_JDBC_DRIVER));
            dataSource.setUsername(properties.getProperty(Constants.LEAF_JDBC_USERNAME));
            dataSource.setPassword(properties.getProperty(Constants.LEAF_JDBC_PASSWORD));
            dataSource.init();

            // Config Dao
            IDAllocDao dao = new IDAllocDaoImpl(dataSource);

            // Config ID Gen
            idGen = new SegmentIDGenImpl();
            ((SegmentIDGenImpl) idGen).setDao(dao);
            if (idGen.init()) {
                logger.info("Segment Service Init Successfully");
            } else {
                throw new InitException("Segment Service Init Fail");
            }
        } else {
            idGen = new ZeroIDGen();
            logger.info("Zero ID Gen Service Init Successfully");
        }
    }

    public Result getId(String key) {
        return idGen.get(key);
    }

    public SegmentIDGenImpl getIdGen() {
        if (idGen instanceof SegmentIDGenImpl) {
            return (SegmentIDGenImpl) idGen;
        }
        return null;
    }
}

SegmentIDGenImpl

package com.sankuai.inf.leaf.segment;

import com.sankuai.inf.leaf.IDGen;
import com.sankuai.inf.leaf.common.Result;
import com.sankuai.inf.leaf.common.Status;
import com.sankuai.inf.leaf.segment.dao.IDAllocDao;
import com.sankuai.inf.leaf.segment.model.*;
import org.perf4j.StopWatch;
import org.perf4j.slf4j.Slf4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class SegmentIDGenImpl implements IDGen {
    private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);

    /**
     * IDCache未初始化成功时的异常码
     */
    private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
    /**
     * key不存在时的异常码
     */
    private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
    /**
     * SegmentBuffer中的两个Segment均未从DB中装载时的异常码
     */
    private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
    /**
     * 最大步长不超过100,0000
     */
    private static final int MAX_STEP = 1000000;
    /**
     * 一个Segment维持时间为15分钟
     */
    private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
    private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new UpdateThreadFactory());
    private volatile boolean initOK = false;
    private Map cache = new ConcurrentHashMap();
    private IDAllocDao dao;

    public static class UpdateThreadFactory implements ThreadFactory {

        private static int threadInitNumber = 0;

        private static synchronized int nextThreadNum() {
            return threadInitNumber++;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
        }
    }
    // 初始化
    @Override
    public boolean init() {
        logger.info("Init ...");
        // 确保加载到kv后才初始化成功
        updateCacheFromDb();
        initOK = true;
        updateCacheFromDbAtEveryMinute();
        return initOK;
    }
    // 每分钟都去更新缓存
    private void updateCacheFromDbAtEveryMinute() {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("check-idCache-thread");
                t.setDaemon(true);
                return t;
            }
        });
        service.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                updateCacheFromDb();
            }
        }, 60, 60, TimeUnit.SECONDS);
    }
    // 更新缓存的数据
    private void updateCacheFromDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            List dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            List cacheTags = new ArrayList(cache.keySet());
            Set insertTagsSet = new HashSet<>(dbTags);
            Set removeTagsSet = new HashSet<>(cacheTags);
            //db中新加的tags灌进cache
            for(int i = 0; i < cacheTags.size(); i++){
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp)){
                    insertTagsSet.remove(tmp);
                }
            }
            for (String tag : insertTagsSet) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }
            //cache中已失效的tags从cache删除
            for(int i = 0; i < dbTags.size(); i++){
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp)){
                    removeTagsSet.remove(tmp);
                }
            }
            for (String tag : removeTagsSet) {
                cache.remove(tag);
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }
    // 获取ID
    @Override
    public Result get(final String key) {
        if (!initOK) {
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        if (cache.containsKey(key)) {
            SegmentBuffer buffer = cache.get(key);
            if (!buffer.isInitOk()) {
                synchronized (buffer) {
                    if (!buffer.isInitOk()) {
                        try {
                            updateSegmentFromDb(key, buffer.getCurrent());
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            return getIdFromSegmentBuffer(cache.get(key));
        }
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }
    // 更新的流程
    public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        if (!buffer.isInitOk()) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        } else if (buffer.getUpdateTimestamp() == 0) {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
        } else {
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    nextStep = nextStep * 2;
                }
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
        }
        // must set value before set max
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }

    public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
        while (true) {
            buffer.rLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {
                    service.execute(new Runnable() {
                        @Override
                        public void run() {
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            try {
                                updateSegmentFromDb(buffer.getKey(), next);
                                updateOk = true;
                                logger.info("update segment {} from db {}", buffer.getKey(), next);
                            } catch (Exception e) {
                                logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            } finally {
                                if (updateOk) {
                                    buffer.wLock().lock();
                                    buffer.setNextReady(true);
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();
                                } else {
                                    buffer.getThreadRunning().set(false);
                                }
                            }
                        }
                    });
                }
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
            } finally {
                buffer.rLock().unlock();
            }
            waitAndSleep(buffer);
            buffer.wLock().lock();
            try {
                final Segment segment = buffer.getCurrent();
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
                if (buffer.isNextReady()) {
                    buffer.switchPos();
                    buffer.setNextReady(false);
                } else {
                    logger.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            } finally {
                buffer.wLock().unlock();
            }
        }
    }

    private void waitAndSleep(SegmentBuffer buffer) {
        int roll = 0;
        while (buffer.getThreadRunning().get()) {
            roll += 1;
            if(roll > 10000) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                    break;
                } catch (InterruptedException e) {
                    logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }

    public List getAllLeafAllocs() {
        return dao.getAllLeafAllocs();
    }

    public Map getCache() {
        return cache;
    }

    public IDAllocDao getDao() {
        return dao;
    }

    public void setDao(IDAllocDao dao) {
        this.dao = dao;
    }
}