1. 对比背景
GroupDataProcessor (GDP) 是项目中已有的通用异步"边攒边发"框架,基于 GroupDataContainer + DistributeLocker + 共享线程池实现。
V2 PushEventHandler 是专为 WebSocket 推送引擎设计的新方案,基于 Disruptor + 共享锁数组 + 专属 Sender 线程池实现。
两者都实现了"边合并边发送"的核心模式。本文从架构、线程模型、锁机制、数据流、重试、性能六个维度进行深度对比。
核心共识:两者都遵循同一模式 — 锁内merge、锁内copy、无锁send、锁内remove。差异在于:锁的实现、线程的归属、调度的机制、重试的策略。
2. 架构原理对比
2.1 GroupDataProcessor 架构
graph TD
subgraph 调用方线程
C1["业务线程 1"]
C2["业务线程 N"]
end
subgraph GroupDataProcessor
GDC["GroupDataContainer
ConcurrentHashMap + DistributeLocker"] TFM["taskFutureMap
ConcurrentHashMap: key → RunningTask"] end subgraph 执行层 SE["共享 ScheduledExecutorService
或 ExecutorService"] SP["scheduleProcess
copyForUse → process → clearData"] end C1 -->|"addData(list)"| GDC C2 -->|"addData(list)"| GDC GDC -->|"lockRunner 回调
检查 taskFutureMap"| TFM TFM -->|"无 RunningTask → scheduleRun"| SE SE --> SP SP -->|"lockRun: copyForUse"| GDC SP -->|"processor.run (无锁)"| WSP["业务处理"] SP -->|"lockRun: clearData"| GDC SP -->|"还有数据 → 再次 scheduleRun"| SE style GDC fill:#fef3c7,stroke:#92400e style WSP fill:#ecfdf5,stroke:#059669
ConcurrentHashMap + DistributeLocker"] TFM["taskFutureMap
ConcurrentHashMap: key → RunningTask"] end subgraph 执行层 SE["共享 ScheduledExecutorService
或 ExecutorService"] SP["scheduleProcess
copyForUse → process → clearData"] end C1 -->|"addData(list)"| GDC C2 -->|"addData(list)"| GDC GDC -->|"lockRunner 回调
检查 taskFutureMap"| TFM TFM -->|"无 RunningTask → scheduleRun"| SE SE --> SP SP -->|"lockRun: copyForUse"| GDC SP -->|"processor.run (无锁)"| WSP["业务处理"] SP -->|"lockRun: clearData"| GDC SP -->|"还有数据 → 再次 scheduleRun"| SE style GDC fill:#fef3c7,stroke:#92400e style WSP fill:#ecfdf5,stroke:#059669
2.2 V2 PushEventHandler 架构
graph TD
subgraph 生产者
P1["业务线程 1"]
P2["业务线程 N"]
end
subgraph Disruptor层
RB["Ring Buffer 32768"]
DT["Disruptor消费线程
onEvent: 锁内merge + 调度判断"] end subgraph 共享状态 BM["batchMap
ConcurrentHashMap"] LK["Object locks x64"] AS["activeSenders
ConcurrentHashSet"] end subgraph Sender层 S0["Sender-0 STPE coreSize=1"] S1["Sender-1"] S2["Sender-2"] S3["Sender-3"] end P1 -->|publish| RB P2 -->|publish| RB RB --> DT DT -->|"synchronized: merge"| BM DT -->|"首次key → execute"| S0 S0 -->|"synchronized: copy"| BM S0 -->|"无锁发送"| WSP["wsPushProcess"] S0 -->|"synchronized: remove"| BM S0 -->|"失败 → schedule重试"| S0 style DT fill:#dbeafe,stroke:#1e40af style WSP fill:#ecfdf5,stroke:#059669 style LK fill:#fce7f3,stroke:#be185d
onEvent: 锁内merge + 调度判断"] end subgraph 共享状态 BM["batchMap
ConcurrentHashMap"] LK["Object locks x64"] AS["activeSenders
ConcurrentHashSet"] end subgraph Sender层 S0["Sender-0 STPE coreSize=1"] S1["Sender-1"] S2["Sender-2"] S3["Sender-3"] end P1 -->|publish| RB P2 -->|publish| RB RB --> DT DT -->|"synchronized: merge"| BM DT -->|"首次key → execute"| S0 S0 -->|"synchronized: copy"| BM S0 -->|"无锁发送"| WSP["wsPushProcess"] S0 -->|"synchronized: remove"| BM S0 -->|"失败 → schedule重试"| S0 style DT fill:#dbeafe,stroke:#1e40af style WSP fill:#ecfdf5,stroke:#059669 style LK fill:#fce7f3,stroke:#be185d
2.3 核心架构差异
| 维度 | GDP | V2 |
|---|---|---|
| 事件入口 | 直接方法调用 addData() | Disruptor Ring Buffer → onEvent() |
| 缓冲容器 | GroupDataContainer (ConcurrentHashMap + DistributeLocker) | batchMap (ConcurrentHashMap) + 64个 Object 锁 |
| 调度状态 | taskFutureMap: key → RunningTask (Future + identityObj) | activeSenders: ConcurrentHashSet (仅存 key 的有无) |
| 执行线程 | 共享 ExecutorService (所有 key 共享) | 4 个专属 coreSize=1 STPE (hash 分区) |
| 背压机制 | 依赖 executor 队列大小 | Disruptor Ring Buffer (固定 32768) + Sender 队列 |
3. 线程模型对比
3.1 GDP 线程模型
调用方线程 (业务线程, 可能很多)
│
▼
addData() → DistributeLocker.lockRun(key) → merge + 回调
│ │
│ taskFutureMap 无 RunningTask?
│ │ 是
│ scheduleRun(key, delay)
│ │
▼ ▼
返回 共享 ExecutorService
│
scheduleProcess(key)
├── lockRun: copyForUse
├── processor.run ← 无锁
└── lockRun: clearData + 再调度
GDP 线程特点:
- addData 在调用方线程执行,持锁 merge — 调用方是业务线程,可能是多个不同线程并发调用
- scheduleProcess 在共享线程池执行 — 所有 key 共享,一个 key 慢会影响其他 key
- 同一 key 的 addData 和 scheduleProcess 可能在不同线程 — 需要 DistributeLocker 保护
3.2 V2 线程模型
业务线程 (多个)
│
▼
publish() → Disruptor Ring Buffer
│
▼
Disruptor 消费线程 (单个, 固定)
│ synchronized(lock): merge + 调度判断
│
▼
Sender-0 / Sender-1 / Sender-2 / Sender-3
(每个 coreSize=1, hash(key)%4 选择)
├── synchronized(lock): copyForUse
├── wsPushProcess ← 无锁
└── synchronized(lock): mergeRemove
V2 线程特点:
- merge 在 Disruptor 单消费线程执行 — 天然串行,不存在多线程并发 merge
- send 在 4个专属单线程执行 — hash 分区,slow key 仅影响同 slot 的 1/4 key
- merge 线程和 send 线程通过极短的共享锁协调 — 仅 merge/copy/remove 持锁
线程模型对比表
| 维度 | GDP | V2 |
|---|---|---|
| merge 线程 | 调用方线程 (多个, 不固定) | Disruptor 消费线程 (单个, 固定) |
| send 线程 | 共享线程池 (所有 key 共享) | 4 个专属 STPE (hash 分区) |
| merge 并发度 | 多线程并发 merge 同一 key (DistributeLocker 保护) | 单线程串行 merge (Disruptor 保证) |
| key 隔离性 | 差: 共享线程池, slow key 阻塞所有 | 好: slow key 仅影响 1/4 |
| 线程切换 | 调用方线程 → 共享池线程 | Disruptor 线程 → Sender 线程 |
| CPU 缓存亲和 | 差: key 可能在任意线程处理 | 好: 同 key 始终在同一 Sender 线程 |
4. 锁与同步机制对比
4.1 锁实现对比
| 维度 | GDP — DistributeLocker | V2 — 共享锁数组 |
|---|---|---|
| 锁类型 | synchronized(Object) | synchronized(Object) |
| 锁数组大小 | 构造时指定 (通常 64-256) | 固定 64 |
| hash 算法 | indexForArray: 2的幂用位运算, 否则取模 | getLock: 扰动(h^h>>>16) + 位运算取模 |
| 额外状态 | AtomicInteger[] waitingCounts 每槽位计数 | 无 (更轻量) |
| Metrics | MC_COMMON_LOCK_WAITING_DISTRIBUTE MC_COMMON_LOCK_TIME_DISTRIBUTE | 无锁 metrics (可按需添加) |
| 内存 | Object[N] + AtomicInteger[N] ≈ N×48B | Object[64] ≈ 64×16B = 1KB |
核心相似点:两者都使用
synchronized(Object) + 共享锁数组的模式。V2 精简了 AtomicInteger 等待计数和 Metrics 采集,更轻量。
4.2 单事件周期锁操作量化
一个完整的事件周期 = 数据入队(merge) + 拷贝(copy) + 发送(send) + 清理(remove)。
| 操作 | GDP | V2 | 说明 |
|---|---|---|---|
| merge | 1× lockRunlock → innerAdd → lockRunner → unlock |
1× synchronizedlock → merge + activeSenders判断 → unlock |
V2 把 merge 和调度判断合并在同一把锁内 |
| 调度判断 | 在 lockRunner 回调内 taskFutureMap.get (CHM, 无锁) |
在 merge 同一把锁内 activeSenders.contains (Set, 无锁) |
GDP 用 RunningTask 判断; V2 用 Set 判断 |
| scheduleRun | 锁外: executorService.submit + new RunningTask + taskFutureMap.put |
锁外: senderExecutors[slot].execute + activeSenders.add (已在锁内完成) |
V2 无需创建 RunningTask 对象 |
| copyForUse | 1× lockRunlock → copyForUse → unlock |
1× synchronizedlock → copyForUse → unlock |
完全等价 |
| send | 两者相同: 无锁执行 processor.run / wsPushProcess | 核心耗时操作, 1-10ms | |
| clearData / mergeRemove | 1× lockRunlock → innerRemove → lockRunner → unlock |
1× synchronizedlock → mergeRemove → unlock |
等价; GDP 在 lockRunner 中判断是否再调度 |
| 再调度判断 | 在 clearData 的 lockRunner 内 检查 lockData != null → scheduleRun |
在 senderLoop while 循环内 回到循环头部: 锁内检查 isEmpty |
GDP 在锁内回调; V2 在循环头部锁内检查 |
| 总 synchronized | 3 次 add + copyForUse + clearData |
3 次 merge + copy + remove |
次数完全相同 |
关键发现:两者的 synchronized 次数完全一致 (3次/事件周期)。差异在于锁的竞争程度和锁内附加操作。
4.3 竞争场景对比
场景 A: 多业务线程并发 addData 同一 key
| GDP | V2 | |
|---|---|---|
| 发生条件 | 多个业务线程同时调用 addData 传入同一 key 的数据 | 不存在此场景 — 所有 merge 都在 Disruptor 单消费线程串行执行 |
| 竞争程度 | 中: 多线程争抢同一锁槽位 | 无竞争 |
| 影响 | waitingCount 增加, park/unpark 开销 | 无 |
场景 B: merge 和 send 并发争同一 key
| GDP | V2 | |
|---|---|---|
| 发生条件 | addData (调用方线程) 和 scheduleProcess (executor线程) 同时操作同一 key | onEvent (Disruptor线程) 和 senderLoop (Sender线程) 同时操作同一 key |
| 竞争程度 | 中: 两个不同线程争 DistributeLocker 同一 slot | 极低: 仅当 merge 和 copy/remove 恰好同时发生 (各持锁 <1μs) |
| 分析 | GDP 的 addData 可能被多个调用方线程频繁调用, 竞争频率较高 | V2 的 merge 在 Disruptor 单线程上, 与 Sender 线程的竞争是 1 vs 1, 且锁持续极短 |
sequenceDiagram
participant BT as 业务线程1
participant BT2 as 业务线程2
participant LK as DistributeLocker
participant ET as Executor线程
Note over BT,ET: GDP: 多线程竞争锁
BT->>LK: lockRun(key) 等待
BT2->>LK: lockRun(key) 等待
Note over LK: BT和BT2竞争同一slot!
LK->>BT: 获得锁, merge
BT->>LK: unlock
LK->>BT2: 获得锁, merge
BT2->>LK: unlock
ET->>LK: lockRun(key) copyForUse
Note over ET: 可能与BT3竞争
sequenceDiagram
participant DT as Disruptor线程
participant LK as 共享锁
participant ST as Sender线程
Note over DT,ST: V2: 仅两个线程竞争, 且极低频
DT->>LK: synchronized: merge 约300ns
DT->>LK: unlock
Note over DT,ST: 大部分时间两线程不竞争
ST->>LK: synchronized: copy 约500ns
ST->>LK: unlock
ST->>ST: wsPushProcess 5ms 无锁!
DT->>LK: synchronized: merge 约300ns
DT->>LK: unlock
Note over DT: merge 和 send 真正并行!
ST->>LK: synchronized: remove 约500ns
5. 数据流全路径对比
5.1 数据入队 (add / merge)
GDP: addData
addData(List<T> dataList)
→ 按 key 分组: Map<Object, List<T>>
→ 对每个 key:
locker.lockRun(key, () -> {
for (T data : datas) {
innerAdd(key, data); // merge 到 dataMap
}
lockRunner.run(finalData); // 回调: 检查 taskFutureMap, 首次则 scheduleRun
});
V2: onEvent
onEvent(PushEvent event, ...)
→ incoming = event.getPushModelTaskGroup()
→ synchronized(getLock(batchKey)) {
merge incoming 到 batchMap;
if (!activeSenders.contains(key)) {
activeSenders.add(key);
shouldSchedule = true;
}
}
→ if (shouldSchedule) senderExecutors[slot].execute(senderLoop);
| 差异点 | GDP | V2 |
|---|---|---|
| 调用线程 | 调用方业务线程 (可能多个) | Disruptor 单消费线程 |
| 批量处理 | 支持 List 批量 add, 同 key 在一把锁内多次 innerAdd | 逐事件处理, 每次 merge 一个 PushModelTaskGroup |
| 调度判断 | 在锁内回调 lockRunner 中检查 taskFutureMap | 在同一把锁内检查 activeSenders |
| 调度对象 | RunningTask (封装 Future + identityObj + 计时) | 无额外对象, 仅 Set 标记 + execute |
5.2 发送循环 (scheduleProcess / senderLoop)
GDP: scheduleProcess
scheduleProcess(dataKey, identityObj)
→ copyPushData = dataContainer.copyForUse(dataKey); // 锁内 copy
→ processor.run(copyPushData); // 无锁 send
→ clearPushData = copyPushData; // 成功标记
→ dataContainer.clearData(dataKey, clearPushData, lockRunner);
// 锁内: mergeRemove + 判断是否还有数据
// lockRunner: 还有数据 → scheduleRun(dataKey, delayTime)
// 无数据 → taskFutureMap.remove(dataKey)
V2: senderLoop
senderLoop(batchKey, retryCount)
→ while (true):
synchronized(lock) { copy = agg.copyForUse() } // 锁内 copy
if empty → synchronized(lock) { cleanupKey } → return
wsPushProcess(copy); // 无锁 send
synchronized(lock) { agg.mergeRemove(copy) } // 锁内 remove
// while 继续 → 回到锁内 copyForUse 检查
| 差异点 | GDP | V2 |
|---|---|---|
| 循环方式 | 递归调度: clearData → lockRunner → scheduleRun → executor.submit → scheduleProcess | while 循环: copy → send → remove → 回到 copy |
| 线程切换 | 每轮可能切换线程 (共享池) | 始终在同一 Sender 线程 (while 循环) |
| 无数据时延迟 | delayTime = 1000ms (等待新数据) | 直接 cleanupKey 退出 (新数据通过 onEvent 重新调度) |
| 每批大小 | 由 copyForUse 决定 (通常全量拷贝) | 由 copyForUse 决定 (最多 500) |
V2 的 while 循环 vs GDP 的递归调度:
GDP 每发完一批,通过 clearData 的 lockRunner 回调再次 scheduleRun,经 executor.submit 提交下一轮。中间有线程池调度开销 (~1-5μs)。
V2 直接 while 循环,无线程切换,无调度开销。同一 Sender 线程连续处理同一 key 的多批数据,CPU 缓存更友好。
GDP 每发完一批,通过 clearData 的 lockRunner 回调再次 scheduleRun,经 executor.submit 提交下一轮。中间有线程池调度开销 (~1-5μs)。
V2 直接 while 循环,无线程切换,无调度开销。同一 Sender 线程连续处理同一 key 的多批数据,CPU 缓存更友好。
5.3 并行时序对比
GDP 时序
sequenceDiagram
participant BT as 业务线程
participant GDC as GroupDataContainer
participant ET as Executor线程
BT->>GDC: lockRun: add(v1,v2,v3)
BT->>GDC: unlock
Note over GDC: 首次 → scheduleRun
ET->>GDC: lockRun: copyForUse
ET->>GDC: unlock
ET->>ET: processor.run(v1,v2,v3) 无锁 5ms
par 业务线程并行merge
BT->>GDC: lockRun: add(v4,v5)
BT->>GDC: unlock
and Executor线程并行send
ET->>ET: 仍在 processor.run... 无锁
end
ET->>GDC: lockRun: clearData(v1,v2,v3)
Note over GDC: 剩余v4,v5 → 再调度
ET->>GDC: unlock
V2 时序
sequenceDiagram
participant DT as Disruptor线程
participant BM as batchMap
participant ST as Sender线程
DT->>BM: sync: merge(v1,v2,v3)
Note over BM: 首次 → execute senderLoop
ST->>BM: sync: copyForUse
ST->>ST: wsPushProcess(v1,v2,v3) 无锁 5ms
par Disruptor并行merge
DT->>BM: sync: merge(v4,v5)
and Sender并行send
DT->>BM: sync: merge(v6)
Note over ST: 仍在 wsPushProcess...
end
ST->>BM: sync: mergeRemove(v1,v2,v3)
Note over BM: 剩余v4,v5,v6
ST->>BM: sync: copyForUse(v4,v5,v6)
ST->>ST: wsPushProcess(v4,v5,v6)
两者的并行效果完全一致:send 期间不持锁,merge 可并行进行。关键区别在于:GDP 的 merge 来自多个业务线程(需要应对竞争),V2 的 merge 来自 Disruptor 单线程(天然无竞争),锁的实际竞争概率 V2 更低。
6. 重试机制对比
| 维度 | GDP | V2 |
|---|---|---|
| 失败处理 | processor.run 异常被 catch → clearPushData 置 null → clearData(key, null, lockRunner) → 不执行 mergeRemove (数据保留) |
wsPushProcess 异常被 catch → 不执行 mergeRemove (数据保留) → scheduleRetry |
| 重试延迟 | 固定 1000ms clearData(null) → lockRunner 判断有数据 → scheduleRun(key, 1000ms) |
指数退避: 200, 400, 800, ..., 最大 30s |
| 最大重试 | 无限制 (无告警) 只要有数据就一直调度 |
可配置 (默认 2000), 超过后仍重试但触发严重告警 |
| 重试期间积攒 | 是: addData 不受影响 | 是: onEvent 不受影响, activeSenders 保持 true |
| 重试成功后 | clearData 清理已发送 → 有更多数据 → 再调度 | retryCount 归零 → while 循环继续 drain (每批 500) |
| 超时保护 | invokeTimeOut: RunningTask 运行时间检查 removeTimeOut: 可选移除超时任务 |
无任务超时 (依赖 wsPushProcess 自身超时) 可按需在 wsPushProcess 中设置 |
graph LR
subgraph "GDP 重试路径"
A1["processor.run 失败"] --> B1["clearData(null)"]
B1 --> C1["lockRunner: 有数据"]
C1 --> D1["scheduleRun(key, 1000ms)"]
D1 --> E1["1s后 scheduleProcess"]
E1 --> F1["copyForUse → send"]
end
subgraph "V2 重试路径"
A2["wsPushProcess 失败"] --> B2["scheduleRetry(key, count+1)"]
B2 --> C2["schedule到同一Sender线程"]
C2 --> D2["指数退避等待"]
D2 --> E2["senderLoop(key, retryCount)"]
E2 --> F2["copyForUse → send"]
end
style D1 fill:#fef3c7
style D2 fill:#dbeafe
GDP 重试的局限:固定 1s 延迟, 无退避。下游持续故障时,每秒重试一次,可能加剧压力。V2 的指数退避 (200ms → 30s) 给下游恢复时间,且超过阈值后触发告警提醒运维介入。
7. 性能量化对比
7.1 每事件周期操作计数
| 操作类型 | GDP | V2 | 差异 |
|---|---|---|---|
| synchronized 次数 | 3 | 3 | 持平 |
| CHM 操作 (get/put/remove) | ~5-7 dataMap + taskFutureMap | ~3-4 batchMap + activeSenders | V2 更少 |
| 对象分配 | RunningTask + Future + AtomicReference 每次 scheduleRun ~3 个对象 | Runnable lambda 每次 execute ~1 个对象 | V2 更少 |
| AtomicInteger 操作 | 2× increment/decrement waitingCounts | 1-2× addAndGet bufferedItemCount | 持平 |
| Metrics 采样 (锁内) | 2× summary + time (每次 lockRun) | 0 | V2 更少 |
7.2 内存开销
| 组件 | GDP 每key开销 | V2 每key开销 |
|---|---|---|
| 主 Map entry | CHM.Node ~32B (dataMap) | CHM.Node ~32B (batchMap) |
| 调度状态 | RunningTask ~40B + CHM.Node ~32B (taskFutureMap) | CHM.Node ~16B (activeSenders, 仅 key) |
| 锁结构 | DistributeLocker: 共享, 摊薄 ~4-8B/key | Object[64]: 共享, 摊薄 ~1B/key |
| 总计/key | ~108B | ~49B |
| 1000 key | ~105 KB | ~48 KB (降 54%) |
| 100K key | ~10.5 MB | ~4.8 MB (降 54%) |
V2 无需 RunningTask 对象 (封装 Future + identityObj + beginTime + secondFutureRef),仅用 Set 标记 key 的有无。内存减半。
7.3 延迟分析
| 延迟维度 | GDP | V2 |
|---|---|---|
| 首次事件到发送 | addData 锁 → scheduleRun(0) → executor 调度 → scheduleProcess ~2-10μs (含线程池调度) |
onEvent 锁 → execute → senderLoop ~1-5μs (单线程 STPE, 调度更快) |
| 批间延迟 (两批之间) |
clearData.lockRunner → scheduleRun → executor.submit → 线程调度 → scheduleProcess ~2-10μs (递归调度, 含线程切换) |
while 循环回到头部 ~0.1μs (无线程切换) |
| merge 锁等待 | 可能: 多业务线程争同一 slot ~0.1-10μs (取决于竞争) |
极低: Disruptor 单线程 vs Sender 线程 ~0-0.5μs (偶尔碰撞) |
| 无数据等待 | 1000ms 固定延迟 copyForUse 返回 null → delayTime = 1000ms |
0ms 立即退出 cleanupKey → return; 新数据到来时重新调度 |
| 重试延迟 | 固定 1000ms | 指数退避 200ms → 30s |
V2 的批间延迟优势显著:GDP 每批发完需要递归调度 (锁→回调→scheduleRun→executor.submit→scheduleProcess), 约 2-10μs。V2 的 while 循环回到头部仅需 ~0.1μs。高吞吐场景下,100K 批次可节省 ~200ms-1s 的调度累计延迟。
7.4 吞吐量对比
瓶颈分析
graph LR
subgraph "GDP 瓶颈链"
GA["多线程争锁
DistributeLocker"] --> GB["共享线程池调度
executor.submit"] GB --> GC["scheduleProcess
3次lockRun"] GC --> GD["processor.run
业务发送"] GD --> GE["递归调度
再次submit"] end subgraph "V2 瓶颈链" VA["Disruptor单线程
merge ~300ns"] --> VB["Sender线程
while循环"] VB --> VC["copy ~500ns
+ send 无锁"] VC --> VD["remove ~500ns"] VD --> VB end style GA fill:#fef3c7,stroke:#92400e style GB fill:#fef3c7,stroke:#92400e style GE fill:#fef3c7,stroke:#92400e style VA fill:#dbeafe,stroke:#1e40af style VC fill:#ecfdf5,stroke:#059669
DistributeLocker"] --> GB["共享线程池调度
executor.submit"] GB --> GC["scheduleProcess
3次lockRun"] GC --> GD["processor.run
业务发送"] GD --> GE["递归调度
再次submit"] end subgraph "V2 瓶颈链" VA["Disruptor单线程
merge ~300ns"] --> VB["Sender线程
while循环"] VB --> VC["copy ~500ns
+ send 无锁"] VC --> VD["remove ~500ns"] VD --> VB end style GA fill:#fef3c7,stroke:#92400e style GB fill:#fef3c7,stroke:#92400e style GE fill:#fef3c7,stroke:#92400e style VA fill:#dbeafe,stroke:#1e40af style VC fill:#ecfdf5,stroke:#059669
| 吞吐指标 | GDP | V2 |
|---|---|---|
| 事件入口吞吐 | 受限于调用方线程竞争锁 多线程 add 同一 slot 时排队 |
Disruptor 单线程串行, 无竞争 瓶颈在 Ring Buffer 容量 |
| send 调度开销 | ~2-10μs/批 executor.submit + 线程调度 |
~0.1μs/批 while 循环, 无调度 |
| key 隔离 | 差: 共享线程池, slow key 堵住调度队列 | 好: 4 个独立 Sender, slow key 仅影响 1/4 |
| CPU 缓存命中 | 中: key 可能在不同线程处理 | 好: 同 key 始终在同一 Sender, 热数据常驻 L1/L2 |
| 理论最大 drain 速率 (假设 send 为 0ms) |
~100K-300K batches/s 受 lockRun + submit 开销限制 |
~500K-1M batches/s 仅 synchronized + while 循环 |
8. 场景化深度分析
场景 A: 正常低频 (1K key, 1K events/sec)
| GDP | V2 | |
|---|---|---|
| 锁竞争 | 极低 | 极低 |
| 内存 | ~105KB | ~48KB |
| P99 延迟 | ~2ms | ~2ms |
| 结论 | 两者表现接近, V2 内存更优 | |
场景 B: 高频突发 (100 key, 500K events/sec)
| GDP | V2 | |
|---|---|---|
| 入口竞争 | 高: 多业务线程争 DistributeLocker waitingCount 飙升 | 无: Disruptor 单线程 merge |
| 调度开销 | 高: 频繁 executor.submit + scheduleRun RunningTask 对象分配频繁 | 低: while 循环, 无额外调度 |
| 批间延迟累积 | 1000 批 × ~5μs 调度 = ~5ms | 1000 批 × ~0.1μs = ~0.1ms |
| P99 延迟 | ~10-30ms | ~2-5ms |
场景 C: 网络抖动 (wsPushProcess 间歇慢 50ms)
| GDP | V2 | |
|---|---|---|
| send 期间聚合 | 是: addData 不阻塞 | 是: onEvent 不阻塞 |
| 聚合效率 | 好: 50ms 内多次 addData merge | 好: 50ms 内多次 onEvent merge |
| 重试行为 | 固定 1s 重试, 可能过慢 | 200ms 首次重试, 退避到 30s |
| slow key 影响 | 共享线程池, 堵住其他 key 调度 | 仅影响同 slot 的 1/4 key |
场景 D: 大量 key (100K key, 低频)
| GDP | V2 | |
|---|---|---|
| 内存 | ~10.5MB | ~4.8MB (降 54%) |
| GC 压力 | 中: RunningTask + Future 频繁分配/回收 | 低: 无 RunningTask 分配 |
| 无数据时 | 每 key 延迟 1s 再检查 → 100K 个 1s 定时任务在线程池中 | 立即 cleanupKey 退出 → 无残留任务 |
| key 回收 | copyForUse(null) → 1s 后再查 → 最终 taskFutureMap.remove | senderLoop isEmpty → 立即 cleanupKey |
场景 E: 链路长时间故障 (wsPushProcess 持续失败 10 分钟)
| GDP | V2 | |
|---|---|---|
| 重试频率 | 每 1s 重试 → 10 分钟 = 600 次重试 持续施压下游 | 退避到 30s → 10 分钟 ≈ 20 次重试 给下游恢复空间 |
| 积攒效果 | 正常: addData 不受影响 | 正常: onEvent 不受影响 |
| 恢复后 | 立即 drain (copyForUse 全量) | 立即 drain (每批 500, while 循环) |
| 告警 | 无 (无告警机制) | retryCount > 10 告警; > 2000 严重告警 |
| 数据安全 | 不丢 (buffer 保持) | 不丢 (buffer 保持, 无限重试) |
9. 结论
9.1 相同点 (继承自 GDP 的核心模式)
| 核心模式 | 锁内 merge → 锁内 copy → 无锁 send → 锁内 remove → 循环 |
| 并行能力 | merge 和 send 真正并行 (send 期间不持锁) |
| 锁类型 | 共享锁数组 + synchronized |
| synchronized 次数 | 3 次 / 事件周期 |
| 数据安全 | 发送失败 → 数据保留 buffer → 重试 |
9.2 V2 相对 GDP 的改进
| 维度 | GDP | V2 改进 | 收益 |
|---|---|---|---|
| merge 竞争 | 多业务线程争锁 | Disruptor 单线程, 无争锁 | merge 路径零竞争 |
| 批间调度 | 递归 scheduleRun 经 executor.submit | while 循环, 无线程切换 | 批间延迟降 20-100 倍 |
| key 隔离 | 共享线程池 | 4 个专属 STPE, hash 分区 | slow key 影响范围缩小到 1/4 |
| CPU 缓存 | key 可能在不同线程处理 | 同 key 始终在同一线程 | L1/L2 缓存命中提升 |
| 对象分配 | RunningTask + Future + AtomicReference / 调度 | 仅 Runnable lambda / 调度 | GC 压力降低 |
| 内存 / key | ~108B | ~49B | 降 54% |
| 重试策略 | 固定 1s, 无退避, 无告警 | 指数退避 200ms→30s, 可配置次数, 多级告警 | 更优雅, 可观测 |
| 无数据 key | 1s 延迟后再检查 | 立即退出, 新数据重新调度 | 无残留定时任务 |
| 背压 | 依赖 executor 队列 | Disruptor Ring Buffer (固定大小, 天然背压) | 更可控 |
9.3 GDP 的优势 (V2 未覆盖)
| 维度 | GDP 优势 | V2 情况 |
|---|---|---|
| 通用性 | 通用框架, IGroupData 接口, 任何聚合场景 | WebSocket 推送专用 |
| 批量 add | addData(List) 同 key 在一把锁内多次 merge | 逐事件 merge (Disruptor 模型) |
| 锁监控 | waitingCounts + Metrics (lock waiting, lock time) | 无 (可按需添加) |
| 超时保护 | invokeTimeOut: RunningTask 超时检查 + 可选移除 | 无 (依赖 wsPushProcess 自身超时) |
| processorP1 返回值 | 支持 processor 返回 nextDelayTime 动态控制间隔 | 无 (固定 while 循环) |
| 拒绝策略 | setReject: maxSize 限制 + rejectHandler 回调 | 无 (Disruptor Ring Buffer 背压替代) |
9.4 一句话总结
V2 在继承 GDP "边攒边发" 核心模式的基础上,通过 Disruptor 单消费线程消除 merge 竞争、while 循环消除批间调度开销、hash 分区 STPE 实现 key 隔离、指数退避增强重试 — 以专用化设计换取在 WebSocket 推送场景下的更优性能。
V2 PushEventHandler vs GroupDataProcessor 深度对比 — 2026-03-20