1. 框架定位

GroupDataProcessor 是一个通用的 按 Key 分组 → 异步聚合 → 批量处理 框架。核心价值:用可控的延迟换取更高的吞吐量 — 将同一 Key 的高频小数据合并后一次性处理,减少下游调用次数。

一句话概括:一个 per-key 的单任务调度器,保证同一 key 在任意时刻最多只有一个处理任务在执行,处理期间不阻塞新数据的写入。

框架由四个类构成,职责分明:

graph TD A["IGroupData<T>
数据契约: 聚合/快照/扣减"] --> B["DistributeLocker
分段锁: hash → synchronized"] B --> C["GroupDataContainer<T>
聚合容器: ConcurrentHashMap + 锁"] C --> D["GroupDataProcessor<T>
调度引擎: 递归调度 + 超时保护"] style A fill:#ede9fe,stroke:#7c3aed style B fill:#fce7f3,stroke:#be185d style C fill:#fef3c7,stroke:#d97706 style D fill:#dbeafe,stroke:#2563eb

2. 四层架构逐层解析

2.1 IGroupData<T> — 数据契约

采用 CRTP 泛型 (T extends IGroupData<T>) 使编译期获得具体子类型,避免向下转型。六个方法构成完整数据生命周期:

方法语义生命周期阶段数学类比
getKey()分组路由键路由hash 函数
mergeAdd(T)增量合并聚合可交换半群 (a⊕b = b⊕a)
copyForUse()快照隔离处理前MVCC snapshot
mergeRemove(T)差量扣减清理逆运算 (a⊖b)
isEmpty()判断残留生命终结判定零元素判定
clear()强制清空异常兜底归零

2.2 DistributeLocker — 分段锁基础设施

借鉴 ConcurrentHashMap Segment 思想:将无限 key 空间折叠到固定数量的锁槽位上。

graph LR subgraph "Key 空间 (无限)" K1["key-A"] K2["key-B"] K3["key-C"] K4["key-D"] K5["key-E"] end subgraph "locks[] (固定 64 个)" L0["slot 0
synchronized"] L1["slot 1
synchronized"] L2["slot 2
synchronized"] Ln["slot 63
synchronized"] end K1 -->|"hash%64=0"| L0 K2 -->|"hash%64=1"| L1 K3 -->|"hash%64=0"| L0 K4 -->|"hash%64=2"| L2 K5 -->|"hash%64=63"| Ln style L0 fill:#fce7f3,stroke:#be185d

key-A 和 key-C 映射到同一 slot 0,逻辑上无关但被串行化 — 这是用可接受的虚假串行化换取固定内存 + 零锁对象GC的设计取舍。

2.3 GroupDataContainer<T> — 线程安全聚合层

持有 ConcurrentHashMap<Object, T> dataMap,通过 DistributeLocker 保证同 key 的 add / copy / clear 操作串行:

flowchart LR subgraph add A1["locker.lockRun(key)"] --> A2["innerAdd: mergeAdd"] A2 --> A3["lockRunner: 调度判断"] end subgraph copyForUse B1["locker.lockRun(key)"] --> B2["data.copyForUse()"] end subgraph clearData C1["locker.lockRun(key)"] --> C2["innerRemove: mergeRemove"] C2 --> C3["lockRunner: 再调度判断"] end style A1 fill:#fce7f3 style B1 fill:#fce7f3 style C1 fill:#fce7f3

2.4 GroupDataProcessor<T> — 调度引擎

将"数据到达"事件转化为"异步处理"任务。核心数据结构:

字段类型职责
dataContainerGroupDataContainer<T>聚合缓冲区
taskFutureMapConcurrentHashMap<Object, RunningTask>per-key 任务注册表
processorVoidFunctionP1<T>业务处理器
executorGetterFunctionP1<Object, ExecutorService>线程池获取 (可按 key 分配)
configInvokerFunctionP0<RunningConfig>动态配置获取
timeOutSchedulerScheduledThreadPoolExecutor(3)延迟调度辅助器

3. 设计模式与核心思想

3.1 快照隔离 (MVCC 思想)

这是 GDP 最精妙的设计。类比数据库 MVCC:

sequenceDiagram participant W as 写入方 (addData) participant DB as dataMap 原始数据 participant R as 处理方 (scheduleProcess) participant S as 快照 W->>DB: mergeAdd(v1,v2,v3) R->>DB: 锁内 copyForUse() DB->>S: 创建快照 {v1,v2,v3} R->>R: processor.run(快照) 无锁! par 写入方继续写 W->>DB: mergeAdd(v4,v5) Note over DB: 原始: {v1,v2,v3,v4,v5} and 处理方在快照上工作 R->>R: 处理 {v1,v2,v3} end R->>DB: 锁内 mergeRemove(快照) Note over DB: 扣除{v1,v2,v3} 剩余{v4,v5}
核心洞察:copyForUse() 创建快照,mergeRemove() 差量扣减。处理期间不持锁,新数据无阻塞地合并到原始对象上。这与数据库的快照读 + 增量提交异曲同工。

3.2 递归调度链 (自驱动循环)

GDP 不使用 while(true) 轮询,而是通过回调驱动的递归调度:

flowchart TD A["addData: 数据到达"] --> B{"taskFutureMap
有运行中任务?"} B -->|否| C["scheduleRun(key, 0)
立即调度"] B -->|是| Z["return 等待当前任务处理"] C --> D["scheduleProcess(key)"] D --> E["锁内 copyForUse"] E --> F["processor.run 无锁发送"] F --> G["锁内 clearData"] G --> H{"还有数据?"} H -->|是| I["scheduleRun(key, delayTime)"] I --> D H -->|否| J["taskFutureMap.remove(key)
调度链终止"] style F fill:#ecfdf5,stroke:#059669 style C fill:#dbeafe,stroke:#2563eb style I fill:#dbeafe,stroke:#2563eb
优势:无轮询开销,无空转。只有在数据存在时才调度处理,数据耗尽即停止。通过 delayTime 实现自适应节流。

3.3 代际标识防重入 (identityObj)

// 每次 scheduleRun 生成唯一标识
Object identityObj = new Object();  // 引用唯一性 = 版本号
taskFutureMap.put(dataKey, new RunningTask(identityObj, future, ...));

// scheduleProcess 完成后校验:
if (runningTask.getIdentityObj() != identityObj) {
    return; // 说明已被超时保护重新调度, 当前任务安静退出
}

这是一个乐观版本号机制。每次 scheduleRun = 一个"代",identityObj 是该代的唯一身份。用 !=(引用比较)而非 .equals() 是刻意的:new Object() 的引用唯一性本身就是版本号,比 AtomicLong 更轻量且无 ABA 问题。

设计模式汇总

模式体现位置
生产者-消费者addData (生产) → dataMap → scheduleProcess (消费)
策略模式processor / executorGetter / configInvoker 全部外部化为函数接口
快照隔离 (MVCC)copyForUse + mergeRemove
分段锁 (Striped Lock)DistributeLocker
递归调度 (自驱动)scheduleProcess → scheduleRun → scheduleProcess
版本号 / 代际标识identityObj 防止过期任务干扰
建造者 / 流式setReject(maxSize, handler) 返回 this

4. 完整数据流

4.1 数据生命周期全景

graph TD subgraph "Phase 1: 入队聚合" A1["addData(list)"] --> A2["按 key 分组"] A2 --> A3["locker.lockRun(key)"] A3 --> A4["innerAdd: mergeAdd 到 dataMap"] A4 --> A5["lockRunner: 检查 taskFutureMap"] A5 --> A6{"有活跃任务?"} A6 -->|无| A7["scheduleRun(key, 0)"] A6 -->|有且未超时| A8["return"] A6 -->|有且超时| A9["cancel + scheduleRun"] end subgraph "Phase 2: 异步处理" B1["scheduleProcess"] --> B2["锁内 copyForUse 快照"] B2 --> B3["processor.run(快照) 无锁!"] B3 --> B4{"成功?"} B4 -->|是| B5["clearPushData = 快照"] B4 -->|异常| B6["clearPushData = null"] end subgraph "Phase 3: 清理再调度" C1["锁内 clearData"] --> C2{"clearPushData != null?"} C2 -->|是| C3["mergeRemove(快照)"] C2 -->|否| C4["不执行 remove 数据保留"] C3 --> C5{"isEmpty?"} C5 -->|是| C6["dataMap.remove(key)
taskFutureMap.remove(key)
生命周期结束"] C5 -->|否| C7["scheduleRun(key, delayTime)
继续下一轮"] C4 --> C7 end A7 --> B1 A9 --> B1 B5 --> C1 B6 --> C1 C7 --> B1 style B3 fill:#ecfdf5,stroke:#059669 style C6 fill:#f3f4f6,stroke:#6b7280 style A3 fill:#fce7f3,stroke:#be185d style B2 fill:#fce7f3,stroke:#be185d style C1 fill:#fce7f3,stroke:#be185d

4.2 并行时序 — 边攒边发

sequenceDiagram participant BT1 as 业务线程1 participant BT2 as 业务线程2 participant LK as DistributeLocker participant DM as dataMap participant ET as Executor线程 BT1->>LK: lockRun(keyA) LK->>DM: mergeAdd(v1,v2) LK->>LK: lockRunner → scheduleRun BT1->>LK: unlock ET->>LK: lockRun(keyA) copyForUse LK->>ET: 快照{v1,v2} ET->>LK: unlock par 业务线程继续写入 BT2->>LK: lockRun(keyA) LK->>DM: mergeAdd(v3,v4) BT2->>LK: unlock and Executor线程处理快照 ET->>ET: processor.run({v1,v2}) 5ms 无锁 end ET->>LK: lockRun(keyA) clearData LK->>DM: mergeRemove({v1,v2}) Note over DM: 剩余 {v3,v4} LK->>LK: lockRunner → scheduleRun(0) ET->>LK: unlock ET->>LK: lockRun(keyA) copyForUse LK->>ET: 快照{v3,v4} ET->>ET: processor.run({v3,v4})

5. 分段锁深度解析

5.1 Hash 算法

private int indexForArray(int length, int h) {
    if ((length & (length - 1)) == 0) {
        // 2 的幂: 扰动函数 + 位掩码 (借鉴 HashMap)
        return (h ^ (h >>> 16)) & (length - 1);
    }
    // 非 2 的幂: 取模
    return Math.abs(h % length);
}
路径条件算法性能
快速路径length 是 2 的幂(h ^ (h>>>16)) & (length-1)1次异或 + 1次移位 + 1次与 ≈ 3 CPU指令
慢速路径length 非 2 的幂Math.abs(h % length)1次取模(除法) ≈ 20-40 CPU指令
边界问题:慢速路径中 h == Integer.MIN_VALUEMath.abs 返回负数(补码溢出),% length 得到负索引。实际使用中建议 size 为 2 的幂以走快速路径。

5.2 锁 Metrics 采集

public <T> T lockRun(Object key, FunctionP0<T> runner) {
    Pair<AtomicInteger, Object> lockPair = getLockPair(key);
    AtomicInteger waiting = lockPair.getLeft();
    int count = waiting.incrementAndGet();   // ① 等待计数 +1
    try {
        // ② 上报当前槽位等待线程数
        summary(LOCK_WAITING_DISTRIBUTE, count, tags);
        Timer.Sample sample = Timer.start(); // ③ 开始计时

        synchronized (lockPair.getRight()) { // ④ 获锁 (可能阻塞)
            // ⑤ 上报获锁等待耗时
            time(LOCK_TIME_DISTRIBUTE, sample, tags);
            return runner.run();             // ⑥ 锁内执行
        }
    } finally {
        waiting.decrementAndGet();           // ⑦ 等待计数 -1
    }
}
graph LR A["进入 lockRun"] --> B["waiting++
上报等待深度"] B --> C["Timer.start()"] C --> D["synchronized 获锁
(可能阻塞)"] D --> E["Timer.stop
上报等待耗时"] E --> F["runner.run()
锁内执行"] F --> G["释放锁"] G --> H["waiting--"] style D fill:#fce7f3,stroke:#be185d style F fill:#ecfdf5,stroke:#059669

两个 Metrics 构成锁热度二维视图

5.3 并发不变式

关键不变式:同一 key 的所有数据操作(merge / copy / remove)和调度决策(scheduleRun / remove task)都在同一把分段锁内完成。

虽然 scheduleRun 方法本身没有加锁,但它只在两个地方被调用:

  1. addData 的 lockRunner 回调中 — 在分段锁内
  2. clearData 的 lockRunner 回调中 — 在分段锁内

这是隐式锁传播:scheduleRun 依赖调用者的锁保护。分段锁不仅保护了 dataMap 的数据一致性,也间接保护了 taskFutureMap 中同 key 的调度一致性。

6. 调度机制

6.1 三级延迟策略

long delayTime = clearPushData != null && !clearPushData.isEmpty()
    ? 0                                               // 级别1: 快照未处理完 → 立即
    : (nextDelayTime != null ? nextDelayTime : 1000L); // 级别2: 处理器自定义 / 级别3: 默认1s
级别条件delay语义
1快照数据未处理完0ms数据量大,立即继续(吞吐优先)
2processorP1 返回自定义值自定义处理器自适应节流
3默认1000ms防空转,降低 CPU

6.2 超时保护

flowchart TD A["新数据到达 addData"] --> B["检查 taskFutureMap"] B --> C{"RunningTask 存在?"} C -->|否| D["scheduleRun 首次调度"] C -->|是| E{"runTime < invokeTimeOut?"} E -->|是| F["return 正常等待"] E -->|否| G["AlarmUtils.error 告警"] G --> H{"removeTimeOut?"} H -->|true| I["cancel(false) + remove
scheduleRun 重新调度"] H -->|false| J["仅告警 不干预"] style G fill:#fef2f2,stroke:#ef4444 style I fill:#dbeafe,stroke:#2563eb
Lazy 检测:超时检查发生在新数据到达时(而非独立定时器)。如果某 key 超时后再无新数据,该超时任务永远不会被发现。这是"只有活跃 key 才能享受超时保护"的设计取舍。

6.3 背压与拒绝策略

if (maxSize > 0 && rejectHandler != null && dataContainer.size() >= maxSize) {
    for (T data : dataList) { rejectHandler.run(data); }
    return;
}
维度说明
阈值dataMap 的 key 数量(不是数据条目总量)
策略rejectHandler 函数接口,调用方实现(丢弃/告警/死信队列)
精度软限制:check-then-act 非原子,高并发下可短暂超限
盲区单 key 下聚合海量数据时无感知(getWaitingSize 为此预留)

7. 关键代码路径解析

7.1 scheduleRun — 延迟投递双路径

private void scheduleRun(Object dataKey, long time, RunningConfig runningConfig) {
    ExecutorService executorService = executorGetter.run(dataKey);
    Object identityObj = new Object();  // 版本号
    Runnable runLogic = () -> scheduleProcess(dataKey, identityObj);
    Future<?> future;
    AtomicReference<Future<?>> secondFutureRef = new AtomicReference<>();

    if (time <= 0) {
        // 路径A: 立即提交
        future = executorService.submit(runLogic);
    } else {
        if (executorService instanceof ScheduledExecutorService) {
            // 路径B: 原生延迟
            future = ((ScheduledExecutorService) executorService)
                     .schedule(runLogic, time, TimeUnit.MILLISECONDS);
        } else {
            // 路径C: 二级中转 — 先定时器延迟, 再提交到业务线程池
            future = this.timeOutScheduler.schedule(() -> {
                Future<?> secondFuture = executorService.submit(runLogic);
                secondFutureRef.set(secondFuture);  // 二级 Future 用于级联 cancel
            }, time, TimeUnit.MILLISECONDS);
        }
    }
    this.taskFutureMap.put(dataKey, new RunningTask(identityObj, future, secondFutureRef));
}
flowchart TD A["scheduleRun(key, time)"] --> B{"time <= 0?"} B -->|是| C["路径A: executor.submit
立即执行"] B -->|否| D{"executor 支持 schedule?"} D -->|是| E["路径B: executor.schedule
原生延迟"] D -->|否| F["路径C: timeOutScheduler.schedule
→ executor.submit
二级中转"] C --> G["taskFutureMap.put(key, RunningTask)"] E --> G F --> G style C fill:#ecfdf5 style E fill:#dbeafe style F fill:#fef3c7

路径C的 secondFutureRef 解决了"定时器 Future 和实际执行 Future 不是同一个"的问题。RunningTask.cancel() 会级联取消两层 Future。

7.2 clearData 回调中的再调度决策

dataContainer.clearData(dataKey, clearPushData, (lockData) -> {
    // lockData: mergeRemove 后的剩余数据, null 表示已清空

    if (lockData == null) {
        // 无剩余数据 → 调度链终止
        taskFutureMap.remove(dataKey);
        return;
    }

    // identityObj 校验: 防止被超时保护替换后的旧任务干扰
    RunningTask runningTask = taskFutureMap.get(dataKey);
    if (runningTask != null && runningTask.getIdentityObj() != identityObj) {
        return; // 已被新一代任务替换, 安静退出
    }

    // 还有数据 → 继续调度
    scheduleRun(dataKey, delayTime, runningConfig);
});
这段回调在分段锁内执行,与 addData 的 lockRunner 互斥,保证了调度决策的原子性:不会出现"clearData 判断无数据要退出"与"addData 判断无任务要调度"同时发生的竞态。

8. GroupDataProcessor vs Disruptor

graph LR subgraph "GDP: Push 模型" GA["数据到达"] -->|"addData 推入"| GB["dataMap 聚合"] GB -->|"回调触发"| GC["调度处理"] end subgraph "Disruptor: Pull 模型" DA["数据到达"] -->|"publish 写入"| DB["RingBuffer"] DB -->|"消费者拉取"| DC["EventHandler"] end style GB fill:#fef3c7,stroke:#d97706 style DB fill:#dbeafe,stroke:#2563eb
维度GroupDataProcessorDisruptor
模型Push: 数据到达驱动调度Pull: 消费者主动从 RingBuffer 拉取
核心价值数据聚合后再处理
(trade latency for throughput)
极致低延迟事件传递
(trade memory for latency)
缓冲区ConcurrentHashMap
无界, 按 key 聚合
RingBuffer
固定大小, 预分配
背压软限制: key 数量阈值硬限制: RingBuffer 满则生产者自旋
线程所有权无固定绑定: 线程池中任意线程强绑定: EventHandler 绑定一个线程
数据聚合核心特性: mergeAdd 原地合并不支持: 每个事件独立处理
DistributeLocker (分段 synchronized)无锁: volatile sequence + CAS
GC频繁创建/销毁快照对象预分配 Entry, 几乎零 GC
有序性同 key 有序, 跨 key 无序全局有序 (单生产者) 或分区有序
延迟有意引入延迟 (delayTime) 换聚合追求极致低延迟
适用场景推送合并、计数汇总、批量写入超低延迟事件流、交易撮合

内存模型对比

graph TD subgraph "GDP 内存" GM1["ConcurrentHashMap
动态增长"] GM2["Object[] locks
固定分配"] GM3["copyForUse 快照
频繁分配/回收"] GM4["RunningTask
per-key 分配"] end subgraph "Disruptor 内存" DM1["RingBuffer Entry[]
一次性预分配"] DM2["Padding
消除伪共享"] DM3["Sequence
volatile long"] end style GM3 fill:#fef3c7 style DM1 fill:#ecfdf5
对象GDP 生命期Disruptor 生命期
缓冲区动态: CHM entry 随 key 增减静态: 预分配, 永不回收
数据载体copyForUse 产生短命快照
每次处理 1 个 + GC 1 个
RingBuffer Entry 复用
零分配
调度状态RunningTask per key
~40B/key
Sequence 共享
~8B total
GC 压力中: 快照 + lambda + 临时 Map极低: 几乎零分配
选型建议:
  • 需要按 key 聚合后批处理(推送合并、批量写入)→ GDP
  • 需要极致低延迟事件流(交易撮合、实时计算)→ Disruptor
  • 两者结合(如 V2 PushEventHandler):Disruptor 做高速入口,GDP 模式做聚合+发送

9. 优劣与风险评估

9.1 设计优点

优点说明
分层解耦锁、容器、调度三层各司其职,可独立测试和替换
快照隔离处理期间不阻塞写入,最大化并行度
自驱动调度无轮询开销,数据耗尽即停,delayTime 自适应节流
代际标识identityObj 简洁有效解决超时重调度版本冲突
锁 MetricswaitingCount + lockTime 二维视图,线上问题排查友好
高度可配processor / executor / config 全部外部化,通用性极强

9.2 潜在风险

风险分析影响
异常后调度链断裂scheduleProcess 外层 catch 设置 scheduleError=true 后 return。dataMap 和 taskFutureMap 中的数据不被清理,该 key 永久"冻结"。数据内存泄漏,该 key 永不处理
隐式锁传播scheduleRun 本身不加锁,依赖调用者在锁内调用。若未来在锁外调用则破坏不变式。并发安全隐患
secondFutureRef 竞态路径C中,定时器线程 set(secondFuture) 与 cancel 之间有时间窗口。cancel 时 secondFuture 可能还是 null,无法级联取消。超时保护 cancel 失效
Lazy 超时检测超时检查依赖新数据触发。不活跃 key 超时后无人发现。僵尸任务占用线程
无数据 1s 空转默认 delayTime=1000ms,低频 key 每秒空查一次。100K key 时产生 100K 个定时任务。线程池队列膨胀

9.3 优化建议

优化说明
异常恢复机制增加定期巡检线程,扫描 taskFutureMap 中长期未活动的 entry,触发恢复或清理
批量 add 锁优化按锁槽位分组,同一槽位的所有 key 一次锁内完成,减少锁获取次数
copyForUse 交换优化对于支持 swap 语义的数据结构,用交换代替拷贝,避免快照对象分配
背压维度细化增加基于 getWaitingSize() 总和的数据量级背压
显式锁标注对 scheduleRun 加 @GuardedBy 注解或断言,防止锁外误调用

9.4 总评

GroupDataProcessor 是一个设计精良的 per-key 异步聚合处理框架。其快照隔离思想(MVCC)、递归自驱动调度、代际标识防重入三大设计堪称典范。分段锁 + Metrics 的组合为生产运维提供了优秀的可观测性。主要风险在于异常后无恢复机制和隐式锁传播。作为通用框架,其抽象程度和可扩展性远优于专用实现。

GroupDataProcessor 全面技术深度分析 — 2026-03-20