1. V1 问题:合并与发送实质上是串行的
V1 将 merge 和 send 都提交到同一个 Sender 单线程:
// V1 onEvent:
senderExecutors[slot].execute(() -> mergeAndTriggerDrain(key, incoming));
// mergeAndTriggerDrain 内:
mergeAdd(incoming); // merge
drainAndSend(...) // send (同一线程, 阻塞后续 merge 任务)
wsPushProcess 执行期间(可能 1-10ms),新事件排在 executor 队列中等待,无法 merge 进 batchMap。这不是真正的"边攒边发",而是"攒一会儿发一会儿"。
sequenceDiagram
participant DT as Disruptor线程
participant Q as Sender队列
participant ST as Sender线程
Note over DT,ST: V1 串行模式
DT->>Q: submit merge(v1)
Q->>ST: merge(v1) + drainAndSend
ST->>ST: wsPushProcess(v1) 耗时5ms
DT->>Q: submit merge(v2) 排队等待!
DT->>Q: submit merge(v3) 排队等待!
Note over Q: v2,v3 在队列中, 无法merge
ST->>Q: drain完成
Q->>ST: merge(v2) 才开始
另外,V1 中的 KeySendState 是 PushEventHandler 的私有内部类(private static class),已在代码中定义。V2 中将其彻底消除——状态分散到 batchMap + activeSenders,更简洁。
2. V2 改进:真正的边合并边发送
2.1 核心原理
借鉴 GroupDataProcessor 的模式:merge 在 Disruptor 线程完成,send 在 Sender 线程完成,两者通过极短的共享锁协调。
| 阶段 | 执行线程 | 持锁? | 耗时 |
|---|---|---|---|
| mergeAdd (合并数据) | Disruptor 消费线程 | 是 (~100-500ns) | 极短 |
| copyForUse (拷贝批次) | Sender 线程 | 是 (~200-1000ns) | 极短 |
| wsPushProcess (发送) | Sender 线程 | 否! | 1-10ms |
| mergeRemove (清理已发送) | Sender 线程 | 是 (~200-1000ns) | 极短 |
关键:wsPushProcess 执行时不持任何锁。因此 Disruptor 线程可以同时 merge 新数据,实现真正的并行。锁仅保护 TreeMap 的读写操作,持锁时间在微秒级。
2.2 共享锁数组(替代 per-key ReentrantLock)
使用 64 个 Object 锁槽位的共享数组,通过 hash(key) % 64 选择锁。避免了 1.8 版本 per-key ReentrantLock 的两个问题:
- 无内存泄漏:锁数组固定 64 个 Object,不随 key 增减
- 极轻量:synchronized 无竞争时仅 ~50ns;Object 锁无 AQS 开销
// 64 个锁槽, 固定分配, 不随 key 增减
private final Object[] locks = new Object[64];
private Object getLock(CombinedKeys key) {
int h = key.hashCode() ^ (key.hashCode() >>> 16);
return locks[(h & 0x7FFFFFFF) % 64];
}
2.3 重试机制
- 指数退避:delay = min(100ms × 2retryCount, 30s)
- 可配置最大重试:默认 2000 次,超过后仍继续重试但触发严重告警
- 重试期间持续积攒:activeSenders 标记保持 true,onEvent 正常 merge 新数据
- 重试成功后继续 drain:senderLoop 的 while 循环继续逐批(500)发送
- 不走 Disruptor:直接 schedule 到同一 Sender 线程,更简洁
3. 核心流程图
3.1 V2 总体架构图
graph TD
subgraph 生产者
P1["业务线程"]
P2["业务线程"]
end
subgraph Disruptor层
RB["Ring Buffer"]
DT["Disruptor消费线程
onEvent: merge + 调度判断
持锁 约100-500ns"] end subgraph 共享状态 BM["batchMap
ConcurrentHashMap"] LK["locks 64个Object锁
保护 batchMap 读写"] AS["activeSenders
ConcurrentHashSet"] end subgraph Sender层 S0["Sender-0"] S1["Sender-1"] S2["Sender-2"] S3["Sender-3"] end WSP["wsPushProcess
无锁发送 1-10ms"] P1 -->|publish| RB P2 -->|publish| RB RB --> DT DT -->|"锁内merge"| BM DT -->|"首次key调度"| S0 DT -->|"首次key调度"| S1 DT -->|"首次key调度"| S2 DT -->|"首次key调度"| S3 S0 -->|"锁内copy + 锁内remove"| BM S0 -->|"无锁发送"| WSP S1 -->|"无锁发送"| WSP S2 -->|"无锁发送"| WSP S3 -->|"无锁发送"| WSP WSP -->|"失败: schedule重试"| S0 style WSP fill:#e8f5e9,stroke:#2e7d32 style DT fill:#fff3e0,stroke:#e65100 style LK fill:#fce4ec,stroke:#c62828
onEvent: merge + 调度判断
持锁 约100-500ns"] end subgraph 共享状态 BM["batchMap
ConcurrentHashMap"] LK["locks 64个Object锁
保护 batchMap 读写"] AS["activeSenders
ConcurrentHashSet"] end subgraph Sender层 S0["Sender-0"] S1["Sender-1"] S2["Sender-2"] S3["Sender-3"] end WSP["wsPushProcess
无锁发送 1-10ms"] P1 -->|publish| RB P2 -->|publish| RB RB --> DT DT -->|"锁内merge"| BM DT -->|"首次key调度"| S0 DT -->|"首次key调度"| S1 DT -->|"首次key调度"| S2 DT -->|"首次key调度"| S3 S0 -->|"锁内copy + 锁内remove"| BM S0 -->|"无锁发送"| WSP S1 -->|"无锁发送"| WSP S2 -->|"无锁发送"| WSP S3 -->|"无锁发送"| WSP WSP -->|"失败: schedule重试"| S0 style WSP fill:#e8f5e9,stroke:#2e7d32 style DT fill:#fff3e0,stroke:#e65100 style LK fill:#fce4ec,stroke:#c62828
3.2 真正的并行时序
sequenceDiagram
participant DT as Disruptor线程
participant LK as 共享锁
participant BM as batchMap
participant ST as Sender线程
Note over DT,ST: 真正的边合并边发送
DT->>LK: lock
DT->>BM: merge(v1,v2,v3)
DT->>LK: unlock
ST->>LK: lock
ST->>BM: copyForUse -> batch(v1,v2,v3)
ST->>LK: unlock
par Disruptor线程并行merge
DT->>LK: lock
DT->>BM: merge(v4,v5) 并行进行!
DT->>LK: unlock
and Sender线程并行发送
ST->>ST: wsPushProcess(v1,v2,v3) 无锁! 5ms
end
ST->>LK: lock
ST->>BM: mergeRemove(v1,v2,v3)
ST->>LK: unlock
Note over BM: 剩余 v4,v5
ST->>LK: lock
ST->>BM: copyForUse -> batch(v4,v5)
ST->>LK: unlock
ST->>ST: wsPushProcess(v4,v5)
3.3 Sender 线程状态机
flowchart TD
A["onEvent: merge数据"] --> B{"activeSenders 包含key?"}
B -->|否| C["加入activeSenders + 调度senderLoop"]
B -->|是| Z["结束 sender已在运行"]
C --> D["senderLoop开始"]
D --> E["锁内: copyForUse max500"]
E --> F{"batch为空?"}
F -->|是| G["锁内: cleanupKey 移除key"]
G --> Z2["Sender结束"]
F -->|否| H["无锁: wsPushProcess"]
H --> I{"成功?"}
I -->|是| J["锁内: mergeRemove"]
J --> E
I -->|异常| K["scheduleRetry 指数退避"]
K --> Z3["退出循环 等待重试"]
Z3 -->|"delay后"| D
style H fill:#e8f5e9
style K fill:#fce4ec
3.4 重试 + 持续积攒时序
sequenceDiagram
participant DT as Disruptor线程
participant BM as batchMap
participant ST as Sender线程
ST->>ST: wsPushProcess失败!
Note over ST: 数据不移除 activeSenders保持true
ST->>ST: schedule(200ms后重试)
Note over DT,BM: 重试等待期间 持续积攒
DT->>BM: merge(v4)
DT->>BM: merge(v5)
DT->>BM: merge(v6)
Note over BM: 积攒了 v1-v6
Note over ST: 200ms到期
ST->>BM: copyForUse -> batch(v1..v6中前500)
ST->>ST: wsPushProcess(batch)
alt 成功
ST->>BM: mergeRemove + 继续drain下一批
else 再次失败
ST->>ST: schedule(400ms后重试)
end
4. 完整类级别代码
4.1 PushEvent(无修改)
package com.upex.unified.account.counter.push.disruptor;
import com.upex.unified.account.counter.model.PushModelTaskGroup;
import lombok.Getter;
import lombok.Setter;
/**
* Disruptor 事件载体, Ring Buffer slot 复用对象.
*/
public class PushEvent {
@Getter
@Setter
private PushModelTaskGroup pushModelTaskGroup;
public void clear() {
this.pushModelTaskGroup = null;
}
}
4.2 PushEventHandler(V2 核心重构 — 真正的边合并边发送)
完全替换 WebSocketPushLauncher 内的同名内部类。
/**
* V2 PushEventHandler — 真正的边合并边发送.
*
* 核心思路 (与 GroupDataProcessor 一致):
* Disruptor 线程: 锁内 merge → 判断是否需要调度 sender
* Sender 线程: 锁内 copy → 无锁 send → 锁内 remove → 循环
*
* merge 和 send 真正并行: wsPushProcess 期间不持锁,
* Disruptor 线程可同时 merge 新数据.
*
* 不再需要 KeySendState 类 — 状态分散到:
* batchMap (聚合数据) + activeSenders (sender运行标记) + retryCount (参数传递)
*/
public static class PushEventHandler implements EventHandler<PushEvent> {
// ==================== 常量 ====================
/** 每个 handler 的 Sender 线程数 */
private static final int SENDER_THREAD_COUNT = 4;
/** 共享锁数组大小 (2的幂, 便于取模) */
private static final int LOCK_COUNT = 64;
/** 配置刷新频率 */
private static final int CONFIG_REFRESH_EVERY = 64;
/** 基础重试延迟 ms */
private static final long BASE_RETRY_DELAY_MS = 100;
/** 最大重试延迟 ms */
private static final long MAX_RETRY_DELAY_MS = 30_000;
/** 默认最大重试次数 (超过后仍重试, 但触发严重告警) */
private static final int DEFAULT_MAX_RETRY_COUNT = 2000;
/** 触发告警的重试次数阈值 */
private static final int ALARM_RETRY_THRESHOLD = 10;
// ==================== 成员 ====================
private final int index;
private final WebSocketPushLauncher webSocketPushLauncher;
private final MetricsReporterWrapper metricsReporter;
private final Supplier<UnifiedCommonConfig> commonConfigSupplier;
/** 4 个 coreSize=1 的单线程调度器, hash(key)%4 保证 key 亲和 */
private final ScheduledThreadPoolExecutor[] senderExecutors;
/**
* 共享锁数组: 64 个 Object 锁槽.
* hash(key) % 64 选锁, 避免 per-key 锁分配和泄漏.
* 锁仅保护 batchMap 的 merge/copy/remove, 持锁 <1μs.
*/
private final Object[] locks = new Object[LOCK_COUNT];
/**
* 聚合缓冲区: key → 聚合后的 PushModelTaskGroup.
* Disruptor 线程写 (mergeAdd), Sender 线程读写 (copyForUse/mergeRemove).
* 所有访问在 getLock(key) 同步块内.
*/
private final ConcurrentHashMap<CombinedKeys, PushModelTaskGroup> batchMap
= new ConcurrentHashMap<>();
/**
* 活跃 sender 标记: 包含某 key 表示该 key 有 sender 在运行 (或等待重试).
* 防止为同一 key 重复调度 sender.
* 所有访问在 getLock(key) 同步块内.
*/
private final Set<CombinedKeys> activeSenders = ConcurrentHashMap.newKeySet();
/** 总缓冲条数 (监控用, 跨线程 CAS) */
private final AtomicInteger bufferedItemCount = new AtomicInteger(0);
private volatile PushDisruptorManager pushDisruptorManager;
private RingBuffer<PushEvent> ringBuffer;
private volatile long sampleCounter;
private int configRefreshCounter;
private volatile int maxRetryCount = DEFAULT_MAX_RETRY_COUNT;
// ==================== 构造 (签名不变, 兼容 PushDisruptorManager) ====================
public PushEventHandler(int index,
WebSocketPushLauncher webSocketPushLauncher,
MetricsReporterWrapper metricsReporter,
Supplier<UnifiedCommonConfig> commonConfigSupplier) {
this.index = index;
this.webSocketPushLauncher = webSocketPushLauncher;
this.metricsReporter = metricsReporter;
this.commonConfigSupplier = commonConfigSupplier;
// 初始化 64 个锁
for (int i = 0; i < LOCK_COUNT; i++) {
locks[i] = new Object();
}
// 创建 4 个单线程调度器
this.senderExecutors = new ScheduledThreadPoolExecutor[SENDER_THREAD_COUNT];
for (int i = 0; i < SENDER_THREAD_COUNT; i++) {
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(
1, new NamedThreadFactory("push-sender-" + index + "-" + i, true));
stpe.setRemoveOnCancelPolicy(true);
senderExecutors[i] = stpe;
}
}
public void setPushDisruptorManager(PushDisruptorManager manager) {
this.pushDisruptorManager = manager;
}
// ==================== Disruptor 消费入口 ====================
/**
* Disruptor 消费线程: 锁内 merge + 判断是否首次调度 sender.
* 锁持有时间 ~100-500ns (仅 TreeMap.putAll), wsPushProcess 不在此线程.
*/
@Override
public void onEvent(PushEvent event, long sequence, boolean endOfBatch) throws Exception {
try {
PushModelTaskGroup incoming = event.getPushModelTaskGroup();
if (incoming == null || incoming.isEmpty()) {
return;
}
refreshConfigIfNeeded();
CombinedKeys batchKey = (CombinedKeys) incoming.getKey();
boolean shouldSchedule = false;
// 锁内: merge + 调度判断 (原子操作)
synchronized (getLock(batchKey)) {
PushModelTaskGroup agg = batchMap.get(batchKey);
if (agg == null) {
// 首次: 直接放入
batchMap.put(batchKey, incoming);
bufferedItemCount.addAndGet(incoming.getWaitingSize());
} else {
// 合并
int before = agg.getWaitingSize();
agg.mergeAdd(incoming);
int after = agg.getWaitingSize();
bufferedItemCount.addAndGet(Math.max(0, after - before));
}
// 无活跃 sender → 标记 + 调度
if (!activeSenders.contains(batchKey)) {
activeSenders.add(batchKey);
shouldSchedule = true;
}
// 已有 sender 在运行 (或等待重试): 无需调度, 数据已 merge 进 buffer,
// sender 下次 copyForUse 时自然会取到
}
// 锁外调度 (避免在锁内做 executor 操作)
if (shouldSchedule) {
int slot = selectSlot(batchKey);
try {
senderExecutors[slot].execute(() -> senderLoop(batchKey, 0));
} catch (RejectedExecutionException e) {
log.warn("[Handler-{}] sender rejected, key={}", index, batchKey);
synchronized (getLock(batchKey)) {
activeSenders.remove(batchKey);
}
}
}
} finally {
event.clear();
sampleCounter++;
}
}
// ==================== Sender 线程逻辑 ====================
/**
* Sender 主循环: copy → send(无锁) → remove → 循环.
*
* 关键: wsPushProcess 执行期间不持任何锁,
* Disruptor 线程可同时 merge 新数据到 batchMap.
*
* @param batchKey 目标 key
* @param retryCount 当前重试计数 (首次为 0, 失败后递增)
*/
private void senderLoop(CombinedKeys batchKey, int retryCount) {
try {
while (true) {
// ---- Phase 1: 锁内拷贝一批 (最多 500) ----
PushModelTaskGroup batch;
synchronized (getLock(batchKey)) {
PushModelTaskGroup agg = batchMap.get(batchKey);
if (agg == null || agg.isEmpty()) {
// 无数据, 清理 key 退出
cleanupKey(batchKey);
return;
}
batch = agg.copyForUse(); // max 500
}
if (batch == null || batch.isEmpty()) {
synchronized (getLock(batchKey)) {
cleanupKey(batchKey);
}
return;
}
int batchSize = batch.getWaitingSize();
// ---- Phase 2: 无锁发送 (核心耗时操作) ----
try {
webSocketPushLauncher.wsPushProcess(batch);
retryCount = 0; // 成功: 重置重试计数
} catch (Exception e) {
// 失败: 数据不移除, 调度重试
// activeSenders 保持 true → onEvent 不会重复调度
// 重试期间 Disruptor 线程可继续 merge 新数据
log.error("[Handler-{}] send failed, key={}, retry={}, buffered={}",
index, batchKey, retryCount,
getKeyBufferedSize(batchKey), e);
scheduleRetry(batchKey, retryCount + 1);
return; // 退出循环, 等待重试
}
// ---- Phase 3: 锁内移除已发送数据 ----
synchronized (getLock(batchKey)) {
PushModelTaskGroup agg = batchMap.get(batchKey);
if (agg != null) {
agg.mergeRemove(batch);
}
bufferedItemCount.addAndGet(-batchSize);
}
// 循环继续: 回到 Phase 1 检查是否还有数据
}
} catch (Throwable t) {
// 意外异常: 清理状态, 允许新 sender 被调度
log.error("[Handler-{}] senderLoop unexpected error, key={}",
index, batchKey, t);
synchronized (getLock(batchKey)) {
activeSenders.remove(batchKey);
}
}
}
/**
* 清理 key 的全部状态. 必须在 getLock(key) 同步块内调用.
*
* 原子性保证: isEmpty 判断 + remove + activeSenders.remove
* 在同一把锁内完成, 不会与 onEvent 的 merge + 调度判断 产生竞态.
*/
private void cleanupKey(CombinedKeys batchKey) {
batchMap.remove(batchKey);
activeSenders.remove(batchKey);
}
// ==================== 重试 ====================
/**
* 调度指数退避重试.
*
* activeSenders 保持 true → 重试期间 onEvent 不会重复调度 sender.
* onEvent 仍可正常 merge 新数据 → 重试成功后会连同新数据一起 drain.
*
* 超过 maxRetryCount 后仍继续重试 (保证不丢数据), 但触发严重告警.
*
* @param batchKey 目标 key
* @param retryCount 当前重试次数 (从 1 开始)
*/
private void scheduleRetry(CombinedKeys batchKey, int retryCount) {
// 告警
if (retryCount > ALARM_RETRY_THRESHOLD && retryCount <= maxRetryCount) {
AlarmUtils.error("[Handler-{}] retry count={}, key={}", index, retryCount, batchKey);
}
if (retryCount > maxRetryCount) {
AlarmUtils.error("[Handler-{}] EXCEEDED maxRetry={}, still retrying, count={}, key={}",
index, maxRetryCount, retryCount, batchKey);
}
// 指数退避: 100, 200, 400, 800, ... 最大 30s
int exponent = Math.min(retryCount, 15);
long delay = Math.min(BASE_RETRY_DELAY_MS * (1L << exponent), MAX_RETRY_DELAY_MS);
int slot = selectSlot(batchKey);
try {
senderExecutors[slot].schedule(
() -> senderLoop(batchKey, retryCount),
delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// Executor 已关闭 (优雅停机)
log.warn("[Handler-{}] retry schedule rejected, key={}", index, batchKey);
synchronized (getLock(batchKey)) {
activeSenders.remove(batchKey);
}
}
}
// ==================== 工具方法 ====================
/** 共享锁选择: hash 扰动 + 取模 */
private Object getLock(CombinedKeys key) {
int h = key.hashCode();
h ^= (h >>> 16);
return locks[(h & 0x7FFFFFFF) % LOCK_COUNT];
}
/** Sender 线程选择: 同 key 同 slot, 保证 key 内有序 */
private int selectSlot(CombinedKeys batchKey) {
int h = batchKey.hashCode();
h ^= (h >>> 16);
return (h & 0x7FFFFFFF) % SENDER_THREAD_COUNT;
}
/** 获取某 key 的当前缓冲大小 (用于日志) */
private int getKeyBufferedSize(CombinedKeys key) {
PushModelTaskGroup agg = batchMap.get(key);
return agg != null ? agg.getWaitingSize() : 0;
}
// ==================== 生命周期 ====================
public void destroySenders() {
for (ScheduledThreadPoolExecutor executor : senderExecutors) {
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("[Handler-{}] sender not terminated in 30s, forcing", index);
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
// ==================== 配置 ====================
private void refreshConfigIfNeeded() {
if (++configRefreshCounter >= CONFIG_REFRESH_EVERY) {
configRefreshCounter = 0;
UnifiedCommonConfig config = commonConfigSupplier.get();
if (config != null) {
int configured = config.getPushMaxRetryCount();
if (configured > 0) {
this.maxRetryCount = configured;
}
}
}
}
// ==================== 监控 ====================
public void initMonitor(RingBuffer<PushEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
List<Tag> extraTags = List.of(Tag.of("index", String.valueOf(index)));
metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_BACKLOG,
() -> this.ringBuffer.getBufferSize() - this.ringBuffer.remainingCapacity(),
extraTags);
metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_QPS,
() -> sampleCounter, extraTags);
metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_BUFFERED_ITEMS,
bufferedItemCount::get, extraTags);
metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_BUFFERED_KEYS,
this::getBufferedKeyCount, extraTags);
for (int i = 0; i < SENDER_THREAD_COUNT; i++) {
final int si = i;
List<Tag> st = List.of(
Tag.of("index", String.valueOf(index)),
Tag.of("sender", String.valueOf(i)));
metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_SENDER_QUEUE_DEPTH,
() -> senderExecutors[si].getQueue().size(), st);
}
}
int getBufferedKeyCount() {
return batchMap.size();
}
}
4.3 PushDisruptorManager(完整类)
package com.upex.unified.account.counter.push.disruptor;
import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.upex.unified.account.counter.launcher.WebSocketPushLauncher;
import com.upex.unified.account.counter.model.PushModelTaskGroup;
import com.upex.unified.account.facade.utils.AccountIdJoinUtils;
import com.upex.unified.account.service.config.apollo.UnifiedCommonConfig;
import com.upex.unified.account.service.metrics.reporter.MetricsReporterWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import java.util.function.Supplier;
@Slf4j
public class PushDisruptorManager {
private static final int DISRUPTOR_COUNT = 32;
private static final int RING_BUFFER_SIZE = 1 << 15;
private static final int MASK = DISRUPTOR_COUNT - 1;
@SuppressWarnings("unchecked")
private final Disruptor<PushEvent>[] disruptors
= (Disruptor<PushEvent>[]) new Disruptor[DISRUPTOR_COUNT];
@SuppressWarnings("unchecked")
private final RingBuffer<PushEvent>[] ringBuffers
= (RingBuffer<PushEvent>[]) new RingBuffer[DISRUPTOR_COUNT];
private final WebSocketPushLauncher.PushEventHandler[] eventHandlers
= new WebSocketPushLauncher.PushEventHandler[DISRUPTOR_COUNT];
private final WebSocketPushLauncher webSocketPushLauncher;
private final Supplier<UnifiedCommonConfig> commonConfigSupplier;
public PushDisruptorManager(WebSocketPushLauncher webSocketPushLauncher,
Supplier<UnifiedCommonConfig> commonConfigSupplier) {
this.webSocketPushLauncher = webSocketPushLauncher;
this.commonConfigSupplier = commonConfigSupplier;
}
/** 初始化 (修改点: Phase2 注入 manager 引用) */
public void initPushEngine(MetricsReporterWrapper metricsReporter) {
try {
for (int i = 0; i < DISRUPTOR_COUNT; i++) {
initPushDisruptor(i, metricsReporter);
}
// Phase2: 所有 RingBuffer 就绪后注入 manager 引用
for (int i = 0; i < DISRUPTOR_COUNT; i++) {
eventHandlers[i].setPushDisruptorManager(this);
}
log.info("Disruptor init done, total {}", DISRUPTOR_COUNT);
} catch (Exception e) {
log.error("initPushEngine error.", e);
}
}
/** 初始化单个 Disruptor (无修改) */
void initPushDisruptor(int index, MetricsReporterWrapper metricsReporter) {
String prefix = "pushEngineDisruptor-" + index;
NamedThreadFactory threadFactory = new NamedThreadFactory(prefix, true);
disruptors[index] = DisruptorBuilder.<PushEvent>newInstance()
.setRingBufferSize(RING_BUFFER_SIZE)
.setEventFactory(new PushEventFactory())
.setThreadFactory(threadFactory)
.setProducerType(ProducerType.MULTI)
.setWaitStrategy(new LiteBlockingWaitStrategy())
.build();
eventHandlers[index] = new WebSocketPushLauncher.PushEventHandler(
index, webSocketPushLauncher, metricsReporter, commonConfigSupplier);
disruptors[index].handleEventsWith(eventHandlers[index]);
disruptors[index].setDefaultExceptionHandler(new LogExceptionHandler<>(prefix));
ringBuffers[index] = disruptors[index].start();
eventHandlers[index].initMonitor(ringBuffers[index]);
}
/** 发布事件 (无修改) */
public void publish(String shardingKey, PushModelTaskGroup pushModelTaskGroups) {
Pair<Long, Long> accountIdPair = AccountIdJoinUtils.mustGetIdsWithOutExe(shardingKey);
int idx = indexOfMurmurHash3(accountIdPair.getLeft());
RingBuffer<PushEvent> ringBuffer = ringBuffers[idx];
long sequence = ringBuffer.next();
try {
PushEvent pushEvent = ringBuffer.get(sequence);
pushEvent.setPushModelTaskGroup(pushModelTaskGroups);
} finally {
ringBuffer.publish(sequence);
}
}
int indexOfMurmurHash3(long accountId) {
long x = accountId;
x ^= (x >>> 33);
x *= 0xff51afd7ed558ccdL;
x ^= (x >>> 33);
return ((int) x) & MASK;
}
/** 优雅关闭 (修改点: 先 Disruptor, 再 Sender) */
public void destroy() throws Exception {
for (int i = 0; i < DISRUPTOR_COUNT; i++) {
if (disruptors[i] != null) disruptors[i].shutdown();
}
for (int i = 0; i < DISRUPTOR_COUNT; i++) {
if (eventHandlers[i] != null) eventHandlers[i].destroySenders();
}
log.info("All Disruptors and senders shutdown.");
}
public static class PushEventFactory implements EventFactory<PushEvent> {
@Override
public PushEvent newInstance() { return new PushEvent(); }
}
}
4.4 PushModelTaskGroup(无修改)
package com.upex.unified.account.counter.model;
import com.upex.mixcontract.common.framework.model.IGroupData;
import com.upex.mixcontract.common.repo.CombinedKeys;
import com.upex.stream.dto.websocket.CacheKey;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.TreeMap;
@Slf4j
@Getter
public class PushModelTaskGroup implements IGroupData<PushModelTaskGroup> {
private final CacheKey cacheKey;
private TreeMap<Long, Object> wsPushMap = new TreeMap<>();
private CombinedKeys key = null;
public PushModelTaskGroup(CacheKey cacheKey, long version, Object wsPushModel) {
this.cacheKey = cacheKey;
if (wsPushModel != null) {
this.wsPushMap.put(version, wsPushModel);
}
}
@Override
public Object getKey() {
if (key == null) {
this.key = CombinedKeys.builder()
.appendColumn(cacheKey.getTopic())
.appendColumn(cacheKey.getKey2())
.appendColumn(cacheKey.getKey3())
.build();
}
return key;
}
@Override public void mergeAdd(PushModelTaskGroup data) { wsPushMap.putAll(data.getWsPushMap()); }
@Override
public void mergeRemove(PushModelTaskGroup data) {
for (Map.Entry<Long, Object> entry : data.wsPushMap.entrySet()) {
Object pushModel = this.wsPushMap.get(entry.getKey());
if (pushModel == entry.getValue()) { this.wsPushMap.remove(entry.getKey()); }
}
}
@Override
public PushModelTaskGroup copyForUse() {
PushModelTaskGroup copy = new PushModelTaskGroup(this.cacheKey, 0, null);
int count = 0;
for (Map.Entry<Long, Object> entry : this.wsPushMap.entrySet()) {
copy.wsPushMap.put(entry.getKey(), entry.getValue());
if (++count >= 500) break;
}
return copy;
}
@Override public boolean isEmpty() { return wsPushMap.isEmpty(); }
@Override public void clear() { wsPushMap.clear(); }
@Override public int getWaitingSize() { return wsPushMap.size(); }
}
5. 四方性能详细对比
GroupDataProcessor(GDP)、1.8 ReentrantLock(1.8RL)、V1 线程亲和、V2 并行攒发
5.1 单事件周期锁/同步操作
| 阶段 | GDP | 1.8 RL | V1 | V2 |
|---|---|---|---|---|
| merge | 1× synchronized | 1× ReentrantLock | 0 (线程亲和) | 1× synchronized ~100-500ns |
| 调度判断 | 1× CHM.get | 1× CAS | 0 (boolean判断) | 锁内 Set.contains ~10ns |
| copyForUse | 1× synchronized | 1× ReentrantLock | 0 | 1× synchronized ~200-1000ns |
| wsPushProcess | 四者相同, 无锁, 1-10ms | |||
| mergeRemove | 1× synchronized | 1× ReentrantLock | 0 | 1× synchronized ~200-1000ns |
| 总锁操作 | 3× sync | 4× RL + CAS | 0锁 + 1 CAS | 3× sync (极短) |
| merge/send 并行? | 是 (不同线程) | 是 (send时不持锁) | 否 (同一线程串行) | 是 (send时不持锁) |
V2 引入了 3 次 synchronized,但每次持锁仅 ~100-1000ns。换来的是 merge 和 send 的真正并行。
对比 V1 的 0 次锁:V1 虽然无锁,但 merge 和 send 在同一线程串行,wsPushProcess 的 1-10ms 期间完全无法 merge 新数据。
V2 的 3μs 锁开销 vs V1 的 1-10ms 串行等待 → V2 吞吐量显著更优。
对比 V1 的 0 次锁:V1 虽然无锁,但 merge 和 send 在同一线程串行,wsPushProcess 的 1-10ms 期间完全无法 merge 新数据。
V2 的 3μs 锁开销 vs V1 的 1-10ms 串行等待 → V2 吞吐量显著更优。
5.2 并发度与吞吐量
| 指标 | GDP | 1.8 RL | V1 | V2 |
|---|---|---|---|---|
| Disruptor 线程工作 | N/A | 持 RL merge ~0.1-10μs |
仅 execute 提交 ~0.1μs |
持 sync merge ~0.1-0.5μs |
| Sender 可用比 (send时间/总时间) |
~99% 仅短暂持锁 |
~95% RL 竞争损耗 |
send 期间 merge 阻塞 有效利用率低 |
~99% sync 极短 |
| 高频场景 500K evt/s | sync 竞争 | RL 严重竞争 | 串行瓶颈 | sync 无竞争 (不同阶段交错) |
| Key 隔离 | 差 (共享池) | 差 (共享 sender) | 好 (1/4 影响) | 好 (1/4 影响) |
| 内存/key | ~108B | ~208B | ~56B | ~40B (仅 CHM entry) |
| 泄漏风险 | 低 | 高 | 无 | 无 (cleanupKey) |
5.3 场景化对比
场景:突发高频 + 网络抖动(wsPushProcess 间歇慢 50ms)
| V1 (线程亲和) | V2 (并行攒发) | |
|---|---|---|
| send 50ms 期间 | ~50个事件排在 executor 队列 无法 merge, 逐个处理 |
~50个事件全部 merge 进 batchMap 下次 copy 一把取走 |
| 实际批大小 | 每次 1 个事件 (串行入队) | 每次最多 500 个 (聚合效果) |
| wsPushProcess 调用次数 | 50 次 (逐个发) | 1 次 (聚合后批发) |
| 总延迟 | 50 × (merge + send) ≈ 2.5s | 1 × send ≈ 50ms |
V2 的聚合效应极其显著:send 期间的新数据全部 merge 到 buffer,下次 drain 时批量发送。V1 逐个处理,wsPushProcess 调用次数是 V2 的 50 倍。
6. 工作总结
6.1 V1 → V2 核心改进
| 问题 | V1 | V2 |
|---|---|---|
| merge/send 串行 | 同一线程, wsPushProcess 阻塞 merge | Disruptor 线程 merge, Sender 线程 send, 真正并行 |
| KeySendState 未定义 | 私有内部类, 不够透明 | 彻底消除, 状态分散到 batchMap + activeSenders |
| 重试不够健壮 | 10次限制, 空触发器机制复杂 | 2000次可配置, 直接 schedule 到 Sender, 持续积攒 |
| 首次发送 | executor.execute 提交 | 同样, executor.execute 立即提交 (无延迟) |
6.2 设计决策
| 决策 | 选择 | 理由 |
|---|---|---|
| 同步机制 | 共享锁数组 64 个 Object | 固定大小无泄漏, synchronized 无竞争 ~50ns, 远优于 per-key ReentrantLock |
| merge 位置 | Disruptor 线程 (锁内) | 确保 send 期间可并行 merge, 锁持续 <1μs 不阻塞 Disruptor |
| 重试方式 | Sender 线程直接 schedule | 比 V1 的空触发器方案更简洁, 避免 Disruptor 额外负载 |
| activeSenders 清理 | cleanupKey 在锁内原子清理 | 与 onEvent 的调度判断原子互斥, 无竞态窗口 |
6.3 改动范围
| 文件 | 改动 |
|---|---|
PushEvent | 无修改 |
PushEventHandler | 完全重写 (V2: 并行攒发 + 共享锁 + 健壮重试) |
PushDisruptorManager | 小幅修改 (manager注入 + 关闭顺序) |
PushModelTaskGroup | 无修改 |
UtaMetricsEnum | 新增 SENDER_QUEUE_DEPTH |
UnifiedCommonConfig | 新增 pushMaxRetryCount 配置项 |
7. 风险与建议
风险 1:共享锁槽冲突
不同 key 可能映射到同一锁槽 (64 个槽位)。高频场景下同槽 key 的 merge 和 copy 可能短暂竞争。
不同 key 可能映射到同一锁槽 (64 个槽位)。高频场景下同槽 key 的 merge 和 copy 可能短暂竞争。
- 锁持有 <1μs, 竞争概率极低
- 必要时可增加 LOCK_COUNT 到 128 或 256
风险 2:2000 次重试堆积
极端场景下连续失败 2000 次(30s 间隔),期间数据持续积攒可能导致内存增长。
极端场景下连续失败 2000 次(30s 间隔),期间数据持续积攒可能导致内存增长。
- 通过 BUFFERED_ITEMS 指标监控
- 结合 maxRetryCount 告警及时介入
上线建议:
- 灰度 2-3 个 handler, 观察 Disruptor backlog 和 sender 队列深度
- 对比 V1/V2: wsPushProcess 调用次数、P99 延迟、缓冲 key 数
- 确认锁竞争情况 (可通过 JFR 或 async-profiler 采样)
关键配置
| 参数 | 默认 | 说明 |
|---|---|---|
| SENDER_THREAD_COUNT | 4 | 每 handler 的 Sender 线程数 |
| LOCK_COUNT | 64 | 共享锁数组大小 |
| DEFAULT_MAX_RETRY_COUNT | 2000 | 最大重试次数 (可通过 Apollo 动态配置) |
| BASE_RETRY_DELAY_MS | 100 | 首次重试延迟 |
| MAX_RETRY_DELAY_MS | 30000 | 最大重试延迟 |
| copyForUse 上限 | 500 | 每批最多发送条数 |
PushEventHandler V2 — 真正的边合并边发送 — 2026-03-20