1. 对比背景

GroupDataProcessor (GDP) 是项目中已有的通用异步"边攒边发"框架,基于 GroupDataContainer + DistributeLocker + 共享线程池实现。

V2 PushEventHandler 是专为 WebSocket 推送引擎设计的新方案,基于 Disruptor + 共享锁数组 + 专属 Sender 线程池实现。

两者都实现了"边合并边发送"的核心模式。本文从架构、线程模型、锁机制、数据流、重试、性能六个维度进行深度对比。

核心共识:两者都遵循同一模式 — 锁内merge、锁内copy、无锁send、锁内remove。差异在于:锁的实现、线程的归属、调度的机制、重试的策略。

2. 架构原理对比

2.1 GroupDataProcessor 架构

graph TD subgraph 调用方线程 C1["业务线程 1"] C2["业务线程 N"] end subgraph GroupDataProcessor GDC["GroupDataContainer
ConcurrentHashMap + DistributeLocker"] TFM["taskFutureMap
ConcurrentHashMap: key → RunningTask"] end subgraph 执行层 SE["共享 ScheduledExecutorService
或 ExecutorService"] SP["scheduleProcess
copyForUse → process → clearData"] end C1 -->|"addData(list)"| GDC C2 -->|"addData(list)"| GDC GDC -->|"lockRunner 回调
检查 taskFutureMap"| TFM TFM -->|"无 RunningTask → scheduleRun"| SE SE --> SP SP -->|"lockRun: copyForUse"| GDC SP -->|"processor.run (无锁)"| WSP["业务处理"] SP -->|"lockRun: clearData"| GDC SP -->|"还有数据 → 再次 scheduleRun"| SE style GDC fill:#fef3c7,stroke:#92400e style WSP fill:#ecfdf5,stroke:#059669

2.2 V2 PushEventHandler 架构

graph TD subgraph 生产者 P1["业务线程 1"] P2["业务线程 N"] end subgraph Disruptor层 RB["Ring Buffer 32768"] DT["Disruptor消费线程
onEvent: 锁内merge + 调度判断"] end subgraph 共享状态 BM["batchMap
ConcurrentHashMap"] LK["Object locks x64"] AS["activeSenders
ConcurrentHashSet"] end subgraph Sender层 S0["Sender-0 STPE coreSize=1"] S1["Sender-1"] S2["Sender-2"] S3["Sender-3"] end P1 -->|publish| RB P2 -->|publish| RB RB --> DT DT -->|"synchronized: merge"| BM DT -->|"首次key → execute"| S0 S0 -->|"synchronized: copy"| BM S0 -->|"无锁发送"| WSP["wsPushProcess"] S0 -->|"synchronized: remove"| BM S0 -->|"失败 → schedule重试"| S0 style DT fill:#dbeafe,stroke:#1e40af style WSP fill:#ecfdf5,stroke:#059669 style LK fill:#fce7f3,stroke:#be185d

2.3 核心架构差异

维度GDPV2
事件入口直接方法调用 addData()Disruptor Ring Buffer → onEvent()
缓冲容器GroupDataContainer (ConcurrentHashMap + DistributeLocker)batchMap (ConcurrentHashMap) + 64个 Object 锁
调度状态taskFutureMap: key → RunningTask (Future + identityObj)activeSenders: ConcurrentHashSet (仅存 key 的有无)
执行线程共享 ExecutorService (所有 key 共享)4 个专属 coreSize=1 STPE (hash 分区)
背压机制依赖 executor 队列大小Disruptor Ring Buffer (固定 32768) + Sender 队列

3. 线程模型对比

3.1 GDP 线程模型

调用方线程 (业务线程, 可能很多)
     │
     ▼
 addData() → DistributeLocker.lockRun(key) → merge + 回调
     │                                          │
     │                              taskFutureMap 无 RunningTask?
     │                                          │ 是
     │                                  scheduleRun(key, delay)
     │                                          │
     ▼                                          ▼
   返回                            共享 ExecutorService
                                         │
                                  scheduleProcess(key)
                                  ├── lockRun: copyForUse
                                  ├── processor.run   ← 无锁
                                  └── lockRun: clearData + 再调度
GDP 线程特点:
  • addData 在调用方线程执行,持锁 merge — 调用方是业务线程,可能是多个不同线程并发调用
  • scheduleProcess 在共享线程池执行 — 所有 key 共享,一个 key 慢会影响其他 key
  • 同一 key 的 addData 和 scheduleProcess 可能在不同线程 — 需要 DistributeLocker 保护

3.2 V2 线程模型

业务线程 (多个)
     │
     ▼
 publish() → Disruptor Ring Buffer
                    │
                    ▼
          Disruptor 消费线程 (单个, 固定)
          │ synchronized(lock): merge + 调度判断
          │
          ▼
     Sender-0 / Sender-1 / Sender-2 / Sender-3
     (每个 coreSize=1, hash(key)%4 选择)
     ├── synchronized(lock): copyForUse
     ├── wsPushProcess            ← 无锁
     └── synchronized(lock): mergeRemove
V2 线程特点:
  • merge 在 Disruptor 单消费线程执行 — 天然串行,不存在多线程并发 merge
  • send 在 4个专属单线程执行 — hash 分区,slow key 仅影响同 slot 的 1/4 key
  • merge 线程和 send 线程通过极短的共享锁协调 — 仅 merge/copy/remove 持锁

线程模型对比表

维度GDPV2
merge 线程调用方线程 (多个, 不固定)Disruptor 消费线程 (单个, 固定)
send 线程共享线程池 (所有 key 共享)4 个专属 STPE (hash 分区)
merge 并发度多线程并发 merge 同一 key
(DistributeLocker 保护)
单线程串行 merge
(Disruptor 保证)
key 隔离性差: 共享线程池, slow key 阻塞所有好: slow key 仅影响 1/4
线程切换调用方线程 → 共享池线程Disruptor 线程 → Sender 线程
CPU 缓存亲和差: key 可能在任意线程处理好: 同 key 始终在同一 Sender 线程

4. 锁与同步机制对比

4.1 锁实现对比

维度GDP — DistributeLockerV2 — 共享锁数组
锁类型synchronized(Object)synchronized(Object)
锁数组大小构造时指定 (通常 64-256)固定 64
hash 算法indexForArray: 2的幂用位运算, 否则取模getLock: 扰动(h^h>>>16) + 位运算取模
额外状态AtomicInteger[] waitingCounts 每槽位计数无 (更轻量)
MetricsMC_COMMON_LOCK_WAITING_DISTRIBUTE
MC_COMMON_LOCK_TIME_DISTRIBUTE
无锁 metrics (可按需添加)
内存Object[N] + AtomicInteger[N] ≈ N×48BObject[64] ≈ 64×16B = 1KB
核心相似点:两者都使用 synchronized(Object) + 共享锁数组的模式。V2 精简了 AtomicInteger 等待计数和 Metrics 采集,更轻量。

4.2 单事件周期锁操作量化

一个完整的事件周期 = 数据入队(merge) + 拷贝(copy) + 发送(send) + 清理(remove)。

操作GDPV2说明
merge lockRun
lock → innerAdd → lockRunner → unlock
synchronized
lock → merge + activeSenders判断 → unlock
V2 把 merge 和调度判断合并在同一把锁内
调度判断 在 lockRunner 回调内
taskFutureMap.get (CHM, 无锁)
在 merge 同一把锁内
activeSenders.contains (Set, 无锁)
GDP 用 RunningTask 判断; V2 用 Set 判断
scheduleRun 锁外: executorService.submit
+ new RunningTask + taskFutureMap.put
锁外: senderExecutors[slot].execute
+ activeSenders.add (已在锁内完成)
V2 无需创建 RunningTask 对象
copyForUse lockRun
lock → copyForUse → unlock
synchronized
lock → copyForUse → unlock
完全等价
send 两者相同: 无锁执行 processor.run / wsPushProcess 核心耗时操作, 1-10ms
clearData / mergeRemove lockRun
lock → innerRemove → lockRunner → unlock
synchronized
lock → mergeRemove → unlock
等价; GDP 在 lockRunner 中判断是否再调度
再调度判断 在 clearData 的 lockRunner 内
检查 lockData != null → scheduleRun
在 senderLoop while 循环内
回到循环头部: 锁内检查 isEmpty
GDP 在锁内回调; V2 在循环头部锁内检查
总 synchronized 3 次
add + copyForUse + clearData
3 次
merge + copy + remove
次数完全相同
关键发现:两者的 synchronized 次数完全一致 (3次/事件周期)。差异在于锁的竞争程度和锁内附加操作。

4.3 竞争场景对比

场景 A: 多业务线程并发 addData 同一 key

GDPV2
发生条件多个业务线程同时调用 addData 传入同一 key 的数据不存在此场景 — 所有 merge 都在 Disruptor 单消费线程串行执行
竞争程度中: 多线程争抢同一锁槽位无竞争
影响waitingCount 增加, park/unpark 开销

场景 B: merge 和 send 并发争同一 key

GDPV2
发生条件addData (调用方线程) 和 scheduleProcess (executor线程) 同时操作同一 keyonEvent (Disruptor线程) 和 senderLoop (Sender线程) 同时操作同一 key
竞争程度中: 两个不同线程争 DistributeLocker 同一 slot极低: 仅当 merge 和 copy/remove 恰好同时发生 (各持锁 <1μs)
分析GDP 的 addData 可能被多个调用方线程频繁调用, 竞争频率较高V2 的 merge 在 Disruptor 单线程上, 与 Sender 线程的竞争是 1 vs 1, 且锁持续极短
sequenceDiagram participant BT as 业务线程1 participant BT2 as 业务线程2 participant LK as DistributeLocker participant ET as Executor线程 Note over BT,ET: GDP: 多线程竞争锁 BT->>LK: lockRun(key) 等待 BT2->>LK: lockRun(key) 等待 Note over LK: BT和BT2竞争同一slot! LK->>BT: 获得锁, merge BT->>LK: unlock LK->>BT2: 获得锁, merge BT2->>LK: unlock ET->>LK: lockRun(key) copyForUse Note over ET: 可能与BT3竞争
sequenceDiagram participant DT as Disruptor线程 participant LK as 共享锁 participant ST as Sender线程 Note over DT,ST: V2: 仅两个线程竞争, 且极低频 DT->>LK: synchronized: merge 约300ns DT->>LK: unlock Note over DT,ST: 大部分时间两线程不竞争 ST->>LK: synchronized: copy 约500ns ST->>LK: unlock ST->>ST: wsPushProcess 5ms 无锁! DT->>LK: synchronized: merge 约300ns DT->>LK: unlock Note over DT: merge 和 send 真正并行! ST->>LK: synchronized: remove 约500ns

5. 数据流全路径对比

5.1 数据入队 (add / merge)

GDP: addData

addData(List<T> dataList)
  → 按 key 分组: Map<Object, List<T>>
  → 对每个 key:
      locker.lockRun(key, () -> {
          for (T data : datas) {
              innerAdd(key, data);    // merge 到 dataMap
          }
          lockRunner.run(finalData); // 回调: 检查 taskFutureMap, 首次则 scheduleRun
      });

V2: onEvent

onEvent(PushEvent event, ...)
  → incoming = event.getPushModelTaskGroup()
  → synchronized(getLock(batchKey)) {
        merge incoming 到 batchMap;
        if (!activeSenders.contains(key)) {
            activeSenders.add(key);
            shouldSchedule = true;
        }
    }
  → if (shouldSchedule) senderExecutors[slot].execute(senderLoop);
差异点GDPV2
调用线程调用方业务线程 (可能多个)Disruptor 单消费线程
批量处理支持 List 批量 add, 同 key 在一把锁内多次 innerAdd逐事件处理, 每次 merge 一个 PushModelTaskGroup
调度判断在锁内回调 lockRunner 中检查 taskFutureMap在同一把锁内检查 activeSenders
调度对象RunningTask (封装 Future + identityObj + 计时)无额外对象, 仅 Set 标记 + execute

5.2 发送循环 (scheduleProcess / senderLoop)

GDP: scheduleProcess

scheduleProcess(dataKey, identityObj)
  → copyPushData = dataContainer.copyForUse(dataKey);  // 锁内 copy
  → processor.run(copyPushData);                        // 无锁 send
  → clearPushData = copyPushData;                       // 成功标记
  → dataContainer.clearData(dataKey, clearPushData, lockRunner);
      // 锁内: mergeRemove + 判断是否还有数据
      // lockRunner: 还有数据 → scheduleRun(dataKey, delayTime)
      //             无数据 → taskFutureMap.remove(dataKey)

V2: senderLoop

senderLoop(batchKey, retryCount)
  → while (true):
      synchronized(lock) { copy = agg.copyForUse() }   // 锁内 copy
      if empty → synchronized(lock) { cleanupKey } → return
      wsPushProcess(copy);                               // 无锁 send
      synchronized(lock) { agg.mergeRemove(copy) }      // 锁内 remove
      // while 继续 → 回到锁内 copyForUse 检查
差异点GDPV2
循环方式递归调度: clearData → lockRunner → scheduleRun → executor.submit → scheduleProcesswhile 循环: copy → send → remove → 回到 copy
线程切换每轮可能切换线程 (共享池)始终在同一 Sender 线程 (while 循环)
无数据时延迟delayTime = 1000ms (等待新数据)直接 cleanupKey 退出 (新数据通过 onEvent 重新调度)
每批大小由 copyForUse 决定 (通常全量拷贝)由 copyForUse 决定 (最多 500)
V2 的 while 循环 vs GDP 的递归调度:
GDP 每发完一批,通过 clearData 的 lockRunner 回调再次 scheduleRun,经 executor.submit 提交下一轮。中间有线程池调度开销 (~1-5μs)。
V2 直接 while 循环,无线程切换,无调度开销。同一 Sender 线程连续处理同一 key 的多批数据,CPU 缓存更友好。

5.3 并行时序对比

GDP 时序

sequenceDiagram participant BT as 业务线程 participant GDC as GroupDataContainer participant ET as Executor线程 BT->>GDC: lockRun: add(v1,v2,v3) BT->>GDC: unlock Note over GDC: 首次 → scheduleRun ET->>GDC: lockRun: copyForUse ET->>GDC: unlock ET->>ET: processor.run(v1,v2,v3) 无锁 5ms par 业务线程并行merge BT->>GDC: lockRun: add(v4,v5) BT->>GDC: unlock and Executor线程并行send ET->>ET: 仍在 processor.run... 无锁 end ET->>GDC: lockRun: clearData(v1,v2,v3) Note over GDC: 剩余v4,v5 → 再调度 ET->>GDC: unlock

V2 时序

sequenceDiagram participant DT as Disruptor线程 participant BM as batchMap participant ST as Sender线程 DT->>BM: sync: merge(v1,v2,v3) Note over BM: 首次 → execute senderLoop ST->>BM: sync: copyForUse ST->>ST: wsPushProcess(v1,v2,v3) 无锁 5ms par Disruptor并行merge DT->>BM: sync: merge(v4,v5) and Sender并行send DT->>BM: sync: merge(v6) Note over ST: 仍在 wsPushProcess... end ST->>BM: sync: mergeRemove(v1,v2,v3) Note over BM: 剩余v4,v5,v6 ST->>BM: sync: copyForUse(v4,v5,v6) ST->>ST: wsPushProcess(v4,v5,v6)
两者的并行效果完全一致:send 期间不持锁,merge 可并行进行。关键区别在于:GDP 的 merge 来自多个业务线程(需要应对竞争),V2 的 merge 来自 Disruptor 单线程(天然无竞争),锁的实际竞争概率 V2 更低。

6. 重试机制对比

维度GDPV2
失败处理processor.run 异常被 catch
→ clearPushData 置 null
→ clearData(key, null, lockRunner)
→ 不执行 mergeRemove (数据保留)
wsPushProcess 异常被 catch
→ 不执行 mergeRemove (数据保留)
→ scheduleRetry
重试延迟固定 1000ms
clearData(null) → lockRunner 判断有数据 → scheduleRun(key, 1000ms)
指数退避: 200, 400, 800, ..., 最大 30s
最大重试无限制 (无告警)
只要有数据就一直调度
可配置 (默认 2000), 超过后仍重试但触发严重告警
重试期间积攒是: addData 不受影响是: onEvent 不受影响, activeSenders 保持 true
重试成功后clearData 清理已发送 → 有更多数据 → 再调度retryCount 归零 → while 循环继续 drain (每批 500)
超时保护invokeTimeOut: RunningTask 运行时间检查
removeTimeOut: 可选移除超时任务
无任务超时 (依赖 wsPushProcess 自身超时)
可按需在 wsPushProcess 中设置
graph LR subgraph "GDP 重试路径" A1["processor.run 失败"] --> B1["clearData(null)"] B1 --> C1["lockRunner: 有数据"] C1 --> D1["scheduleRun(key, 1000ms)"] D1 --> E1["1s后 scheduleProcess"] E1 --> F1["copyForUse → send"] end subgraph "V2 重试路径" A2["wsPushProcess 失败"] --> B2["scheduleRetry(key, count+1)"] B2 --> C2["schedule到同一Sender线程"] C2 --> D2["指数退避等待"] D2 --> E2["senderLoop(key, retryCount)"] E2 --> F2["copyForUse → send"] end style D1 fill:#fef3c7 style D2 fill:#dbeafe
GDP 重试的局限:固定 1s 延迟, 无退避。下游持续故障时,每秒重试一次,可能加剧压力。V2 的指数退避 (200ms → 30s) 给下游恢复时间,且超过阈值后触发告警提醒运维介入。

7. 性能量化对比

7.1 每事件周期操作计数

操作类型GDPV2差异
synchronized 次数33持平
CHM 操作 (get/put/remove)~5-7
dataMap + taskFutureMap
~3-4
batchMap + activeSenders
V2 更少
对象分配RunningTask + Future + AtomicReference
每次 scheduleRun ~3 个对象
Runnable lambda
每次 execute ~1 个对象
V2 更少
AtomicInteger 操作2× increment/decrement
waitingCounts
1-2× addAndGet
bufferedItemCount
持平
Metrics 采样 (锁内)
summary + time (每次 lockRun)
0V2 更少

7.2 内存开销

组件GDP 每key开销V2 每key开销
主 Map entryCHM.Node ~32B (dataMap)CHM.Node ~32B (batchMap)
调度状态RunningTask ~40B + CHM.Node ~32B (taskFutureMap)CHM.Node ~16B (activeSenders, 仅 key)
锁结构DistributeLocker: 共享, 摊薄 ~4-8B/keyObject[64]: 共享, 摊薄 ~1B/key
总计/key~108B~49B
1000 key~105 KB~48 KB (降 54%)
100K key~10.5 MB~4.8 MB (降 54%)
V2 无需 RunningTask 对象 (封装 Future + identityObj + beginTime + secondFutureRef),仅用 Set 标记 key 的有无。内存减半。

7.3 延迟分析

延迟维度GDPV2
首次事件到发送 addData 锁 → scheduleRun(0) → executor 调度 → scheduleProcess
~2-10μs (含线程池调度)
onEvent 锁 → execute → senderLoop
~1-5μs (单线程 STPE, 调度更快)
批间延迟
(两批之间)
clearData.lockRunner → scheduleRun → executor.submit → 线程调度 → scheduleProcess
~2-10μs (递归调度, 含线程切换)
while 循环回到头部
~0.1μs (无线程切换)
merge 锁等待 可能: 多业务线程争同一 slot
~0.1-10μs (取决于竞争)
极低: Disruptor 单线程 vs Sender 线程
~0-0.5μs (偶尔碰撞)
无数据等待 1000ms 固定延迟
copyForUse 返回 null → delayTime = 1000ms
0ms 立即退出
cleanupKey → return; 新数据到来时重新调度
重试延迟 固定 1000ms 指数退避 200ms → 30s
V2 的批间延迟优势显著:GDP 每批发完需要递归调度 (锁→回调→scheduleRun→executor.submit→scheduleProcess), 约 2-10μs。V2 的 while 循环回到头部仅需 ~0.1μs。高吞吐场景下,100K 批次可节省 ~200ms-1s 的调度累计延迟。

7.4 吞吐量对比

瓶颈分析

graph LR subgraph "GDP 瓶颈链" GA["多线程争锁
DistributeLocker"] --> GB["共享线程池调度
executor.submit"] GB --> GC["scheduleProcess
3次lockRun"] GC --> GD["processor.run
业务发送"] GD --> GE["递归调度
再次submit"] end subgraph "V2 瓶颈链" VA["Disruptor单线程
merge ~300ns"] --> VB["Sender线程
while循环"] VB --> VC["copy ~500ns
+ send 无锁"] VC --> VD["remove ~500ns"] VD --> VB end style GA fill:#fef3c7,stroke:#92400e style GB fill:#fef3c7,stroke:#92400e style GE fill:#fef3c7,stroke:#92400e style VA fill:#dbeafe,stroke:#1e40af style VC fill:#ecfdf5,stroke:#059669
吞吐指标GDPV2
事件入口吞吐 受限于调用方线程竞争锁
多线程 add 同一 slot 时排队
Disruptor 单线程串行, 无竞争
瓶颈在 Ring Buffer 容量
send 调度开销 ~2-10μs/批
executor.submit + 线程调度
~0.1μs/批
while 循环, 无调度
key 隔离 差: 共享线程池, slow key 堵住调度队列 好: 4 个独立 Sender, slow key 仅影响 1/4
CPU 缓存命中 中: key 可能在不同线程处理 好: 同 key 始终在同一 Sender, 热数据常驻 L1/L2
理论最大 drain 速率
(假设 send 为 0ms)
~100K-300K batches/s
受 lockRun + submit 开销限制
~500K-1M batches/s
仅 synchronized + while 循环

8. 场景化深度分析

场景 A: 正常低频 (1K key, 1K events/sec)

GDPV2
锁竞争极低极低
内存~105KB~48KB
P99 延迟~2ms~2ms
结论两者表现接近, V2 内存更优

场景 B: 高频突发 (100 key, 500K events/sec)

GDPV2
入口竞争高: 多业务线程争 DistributeLocker
waitingCount 飙升
无: Disruptor 单线程 merge
调度开销高: 频繁 executor.submit + scheduleRun
RunningTask 对象分配频繁
低: while 循环, 无额外调度
批间延迟累积1000 批 × ~5μs 调度 = ~5ms1000 批 × ~0.1μs = ~0.1ms
P99 延迟~10-30ms~2-5ms

场景 C: 网络抖动 (wsPushProcess 间歇慢 50ms)

GDPV2
send 期间聚合是: addData 不阻塞是: onEvent 不阻塞
聚合效率好: 50ms 内多次 addData merge好: 50ms 内多次 onEvent merge
重试行为固定 1s 重试, 可能过慢200ms 首次重试, 退避到 30s
slow key 影响共享线程池, 堵住其他 key 调度仅影响同 slot 的 1/4 key

场景 D: 大量 key (100K key, 低频)

GDPV2
内存~10.5MB~4.8MB (降 54%)
GC 压力中: RunningTask + Future 频繁分配/回收低: 无 RunningTask 分配
无数据时每 key 延迟 1s 再检查 → 100K 个 1s 定时任务在线程池中立即 cleanupKey 退出 → 无残留任务
key 回收copyForUse(null) → 1s 后再查 → 最终 taskFutureMap.removesenderLoop isEmpty → 立即 cleanupKey

场景 E: 链路长时间故障 (wsPushProcess 持续失败 10 分钟)

GDPV2
重试频率每 1s 重试 → 10 分钟 = 600 次重试
持续施压下游
退避到 30s → 10 分钟 ≈ 20 次重试
给下游恢复空间
积攒效果正常: addData 不受影响正常: onEvent 不受影响
恢复后立即 drain (copyForUse 全量)立即 drain (每批 500, while 循环)
告警无 (无告警机制)retryCount > 10 告警; > 2000 严重告警
数据安全不丢 (buffer 保持)不丢 (buffer 保持, 无限重试)

9. 结论

9.1 相同点 (继承自 GDP 的核心模式)

核心模式锁内 merge → 锁内 copy → 无锁 send → 锁内 remove → 循环
并行能力merge 和 send 真正并行 (send 期间不持锁)
锁类型共享锁数组 + synchronized
synchronized 次数3 次 / 事件周期
数据安全发送失败 → 数据保留 buffer → 重试

9.2 V2 相对 GDP 的改进

维度GDPV2 改进收益
merge 竞争多业务线程争锁Disruptor 单线程, 无争锁merge 路径零竞争
批间调度递归 scheduleRun 经 executor.submitwhile 循环, 无线程切换批间延迟降 20-100 倍
key 隔离共享线程池4 个专属 STPE, hash 分区slow key 影响范围缩小到 1/4
CPU 缓存key 可能在不同线程处理同 key 始终在同一线程L1/L2 缓存命中提升
对象分配RunningTask + Future + AtomicReference / 调度仅 Runnable lambda / 调度GC 压力降低
内存 / key~108B~49B降 54%
重试策略固定 1s, 无退避, 无告警指数退避 200ms→30s, 可配置次数, 多级告警更优雅, 可观测
无数据 key1s 延迟后再检查立即退出, 新数据重新调度无残留定时任务
背压依赖 executor 队列Disruptor Ring Buffer (固定大小, 天然背压)更可控

9.3 GDP 的优势 (V2 未覆盖)

维度GDP 优势V2 情况
通用性通用框架, IGroupData 接口, 任何聚合场景WebSocket 推送专用
批量 addaddData(List) 同 key 在一把锁内多次 merge逐事件 merge (Disruptor 模型)
锁监控waitingCounts + Metrics (lock waiting, lock time)无 (可按需添加)
超时保护invokeTimeOut: RunningTask 超时检查 + 可选移除无 (依赖 wsPushProcess 自身超时)
processorP1 返回值支持 processor 返回 nextDelayTime 动态控制间隔无 (固定 while 循环)
拒绝策略setReject: maxSize 限制 + rejectHandler 回调无 (Disruptor Ring Buffer 背压替代)

9.4 一句话总结

V2 在继承 GDP "边攒边发" 核心模式的基础上,通过 Disruptor 单消费线程消除 merge 竞争、while 循环消除批间调度开销、hash 分区 STPE 实现 key 隔离、指数退避增强重试 — 以专用化设计换取在 WebSocket 推送场景下的更优性能。

V2 PushEventHandler vs GroupDataProcessor 深度对比 — 2026-03-20