1. V1 问题:合并与发送实质上是串行的

V1 将 merge 和 send 都提交到同一个 Sender 单线程:

// V1 onEvent:
senderExecutors[slot].execute(() -> mergeAndTriggerDrain(key, incoming));

// mergeAndTriggerDrain 内:
mergeAdd(incoming);   // merge
drainAndSend(...)     // send (同一线程, 阻塞后续 merge 任务)

wsPushProcess 执行期间(可能 1-10ms),新事件排在 executor 队列中等待,无法 merge 进 batchMap。这不是真正的"边攒边发",而是"攒一会儿发一会儿"。

sequenceDiagram participant DT as Disruptor线程 participant Q as Sender队列 participant ST as Sender线程 Note over DT,ST: V1 串行模式 DT->>Q: submit merge(v1) Q->>ST: merge(v1) + drainAndSend ST->>ST: wsPushProcess(v1) 耗时5ms DT->>Q: submit merge(v2) 排队等待! DT->>Q: submit merge(v3) 排队等待! Note over Q: v2,v3 在队列中, 无法merge ST->>Q: drain完成 Q->>ST: merge(v2) 才开始

另外,V1 中的 KeySendState 是 PushEventHandler 的私有内部类(private static class),已在代码中定义。V2 中将其彻底消除——状态分散到 batchMap + activeSenders,更简洁。

2. V2 改进:真正的边合并边发送

2.1 核心原理

借鉴 GroupDataProcessor 的模式:merge 在 Disruptor 线程完成,send 在 Sender 线程完成,两者通过极短的共享锁协调

阶段执行线程持锁?耗时
mergeAdd (合并数据)Disruptor 消费线程是 (~100-500ns)极短
copyForUse (拷贝批次)Sender 线程是 (~200-1000ns)极短
wsPushProcess (发送)Sender 线程否!1-10ms
mergeRemove (清理已发送)Sender 线程是 (~200-1000ns)极短
关键:wsPushProcess 执行时不持任何锁。因此 Disruptor 线程可以同时 merge 新数据,实现真正的并行。锁仅保护 TreeMap 的读写操作,持锁时间在微秒级。

2.2 共享锁数组(替代 per-key ReentrantLock)

使用 64 个 Object 锁槽位的共享数组,通过 hash(key) % 64 选择锁。避免了 1.8 版本 per-key ReentrantLock 的两个问题:

// 64 个锁槽, 固定分配, 不随 key 增减
private final Object[] locks = new Object[64];

private Object getLock(CombinedKeys key) {
    int h = key.hashCode() ^ (key.hashCode() >>> 16);
    return locks[(h & 0x7FFFFFFF) % 64];
}

2.3 重试机制

3. 核心流程图

3.1 V2 总体架构图

graph TD subgraph 生产者 P1["业务线程"] P2["业务线程"] end subgraph Disruptor层 RB["Ring Buffer"] DT["Disruptor消费线程
onEvent: merge + 调度判断
持锁 约100-500ns"] end subgraph 共享状态 BM["batchMap
ConcurrentHashMap"] LK["locks 64个Object锁
保护 batchMap 读写"] AS["activeSenders
ConcurrentHashSet"] end subgraph Sender层 S0["Sender-0"] S1["Sender-1"] S2["Sender-2"] S3["Sender-3"] end WSP["wsPushProcess
无锁发送 1-10ms"] P1 -->|publish| RB P2 -->|publish| RB RB --> DT DT -->|"锁内merge"| BM DT -->|"首次key调度"| S0 DT -->|"首次key调度"| S1 DT -->|"首次key调度"| S2 DT -->|"首次key调度"| S3 S0 -->|"锁内copy + 锁内remove"| BM S0 -->|"无锁发送"| WSP S1 -->|"无锁发送"| WSP S2 -->|"无锁发送"| WSP S3 -->|"无锁发送"| WSP WSP -->|"失败: schedule重试"| S0 style WSP fill:#e8f5e9,stroke:#2e7d32 style DT fill:#fff3e0,stroke:#e65100 style LK fill:#fce4ec,stroke:#c62828

3.2 真正的并行时序

sequenceDiagram participant DT as Disruptor线程 participant LK as 共享锁 participant BM as batchMap participant ST as Sender线程 Note over DT,ST: 真正的边合并边发送 DT->>LK: lock DT->>BM: merge(v1,v2,v3) DT->>LK: unlock ST->>LK: lock ST->>BM: copyForUse -> batch(v1,v2,v3) ST->>LK: unlock par Disruptor线程并行merge DT->>LK: lock DT->>BM: merge(v4,v5) 并行进行! DT->>LK: unlock and Sender线程并行发送 ST->>ST: wsPushProcess(v1,v2,v3) 无锁! 5ms end ST->>LK: lock ST->>BM: mergeRemove(v1,v2,v3) ST->>LK: unlock Note over BM: 剩余 v4,v5 ST->>LK: lock ST->>BM: copyForUse -> batch(v4,v5) ST->>LK: unlock ST->>ST: wsPushProcess(v4,v5)

3.3 Sender 线程状态机

flowchart TD A["onEvent: merge数据"] --> B{"activeSenders 包含key?"} B -->|否| C["加入activeSenders + 调度senderLoop"] B -->|是| Z["结束 sender已在运行"] C --> D["senderLoop开始"] D --> E["锁内: copyForUse max500"] E --> F{"batch为空?"} F -->|是| G["锁内: cleanupKey 移除key"] G --> Z2["Sender结束"] F -->|否| H["无锁: wsPushProcess"] H --> I{"成功?"} I -->|是| J["锁内: mergeRemove"] J --> E I -->|异常| K["scheduleRetry 指数退避"] K --> Z3["退出循环 等待重试"] Z3 -->|"delay后"| D style H fill:#e8f5e9 style K fill:#fce4ec

3.4 重试 + 持续积攒时序

sequenceDiagram participant DT as Disruptor线程 participant BM as batchMap participant ST as Sender线程 ST->>ST: wsPushProcess失败! Note over ST: 数据不移除 activeSenders保持true ST->>ST: schedule(200ms后重试) Note over DT,BM: 重试等待期间 持续积攒 DT->>BM: merge(v4) DT->>BM: merge(v5) DT->>BM: merge(v6) Note over BM: 积攒了 v1-v6 Note over ST: 200ms到期 ST->>BM: copyForUse -> batch(v1..v6中前500) ST->>ST: wsPushProcess(batch) alt 成功 ST->>BM: mergeRemove + 继续drain下一批 else 再次失败 ST->>ST: schedule(400ms后重试) end

4. 完整类级别代码

4.1 PushEvent(无修改)

package com.upex.unified.account.counter.push.disruptor;

import com.upex.unified.account.counter.model.PushModelTaskGroup;
import lombok.Getter;
import lombok.Setter;

/**
 * Disruptor 事件载体, Ring Buffer slot 复用对象.
 */
public class PushEvent {

    @Getter
    @Setter
    private PushModelTaskGroup pushModelTaskGroup;

    public void clear() {
        this.pushModelTaskGroup = null;
    }
}

4.2 PushEventHandler(V2 核心重构 — 真正的边合并边发送)

完全替换 WebSocketPushLauncher 内的同名内部类。

    /**
     * V2 PushEventHandler — 真正的边合并边发送.
     *
     * 核心思路 (与 GroupDataProcessor 一致):
     *   Disruptor 线程: 锁内 merge → 判断是否需要调度 sender
     *   Sender 线程:    锁内 copy → 无锁 send → 锁内 remove → 循环
     *
     * merge 和 send 真正并行: wsPushProcess 期间不持锁,
     * Disruptor 线程可同时 merge 新数据.
     *
     * 不再需要 KeySendState 类 — 状态分散到:
     *   batchMap (聚合数据) + activeSenders (sender运行标记) + retryCount (参数传递)
     */
    public static class PushEventHandler implements EventHandler<PushEvent> {

        // ==================== 常量 ====================

        /** 每个 handler 的 Sender 线程数 */
        private static final int SENDER_THREAD_COUNT = 4;

        /** 共享锁数组大小 (2的幂, 便于取模) */
        private static final int LOCK_COUNT = 64;

        /** 配置刷新频率 */
        private static final int CONFIG_REFRESH_EVERY = 64;

        /** 基础重试延迟 ms */
        private static final long BASE_RETRY_DELAY_MS = 100;

        /** 最大重试延迟 ms */
        private static final long MAX_RETRY_DELAY_MS = 30_000;

        /** 默认最大重试次数 (超过后仍重试, 但触发严重告警) */
        private static final int DEFAULT_MAX_RETRY_COUNT = 2000;

        /** 触发告警的重试次数阈值 */
        private static final int ALARM_RETRY_THRESHOLD = 10;

        // ==================== 成员 ====================

        private final int index;
        private final WebSocketPushLauncher webSocketPushLauncher;
        private final MetricsReporterWrapper metricsReporter;
        private final Supplier<UnifiedCommonConfig> commonConfigSupplier;

        /** 4 个 coreSize=1 的单线程调度器, hash(key)%4 保证 key 亲和 */
        private final ScheduledThreadPoolExecutor[] senderExecutors;

        /**
         * 共享锁数组: 64 个 Object 锁槽.
         * hash(key) % 64 选锁, 避免 per-key 锁分配和泄漏.
         * 锁仅保护 batchMap 的 merge/copy/remove, 持锁 <1μs.
         */
        private final Object[] locks = new Object[LOCK_COUNT];

        /**
         * 聚合缓冲区: key → 聚合后的 PushModelTaskGroup.
         * Disruptor 线程写 (mergeAdd), Sender 线程读写 (copyForUse/mergeRemove).
         * 所有访问在 getLock(key) 同步块内.
         */
        private final ConcurrentHashMap<CombinedKeys, PushModelTaskGroup> batchMap
                = new ConcurrentHashMap<>();

        /**
         * 活跃 sender 标记: 包含某 key 表示该 key 有 sender 在运行 (或等待重试).
         * 防止为同一 key 重复调度 sender.
         * 所有访问在 getLock(key) 同步块内.
         */
        private final Set<CombinedKeys> activeSenders = ConcurrentHashMap.newKeySet();

        /** 总缓冲条数 (监控用, 跨线程 CAS) */
        private final AtomicInteger bufferedItemCount = new AtomicInteger(0);

        private volatile PushDisruptorManager pushDisruptorManager;
        private RingBuffer<PushEvent> ringBuffer;
        private volatile long sampleCounter;
        private int configRefreshCounter;
        private volatile int maxRetryCount = DEFAULT_MAX_RETRY_COUNT;

        // ==================== 构造 (签名不变, 兼容 PushDisruptorManager) ====================

        public PushEventHandler(int index,
                                WebSocketPushLauncher webSocketPushLauncher,
                                MetricsReporterWrapper metricsReporter,
                                Supplier<UnifiedCommonConfig> commonConfigSupplier) {
            this.index = index;
            this.webSocketPushLauncher = webSocketPushLauncher;
            this.metricsReporter = metricsReporter;
            this.commonConfigSupplier = commonConfigSupplier;

            // 初始化 64 个锁
            for (int i = 0; i < LOCK_COUNT; i++) {
                locks[i] = new Object();
            }

            // 创建 4 个单线程调度器
            this.senderExecutors = new ScheduledThreadPoolExecutor[SENDER_THREAD_COUNT];
            for (int i = 0; i < SENDER_THREAD_COUNT; i++) {
                ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(
                        1, new NamedThreadFactory("push-sender-" + index + "-" + i, true));
                stpe.setRemoveOnCancelPolicy(true);
                senderExecutors[i] = stpe;
            }
        }

        public void setPushDisruptorManager(PushDisruptorManager manager) {
            this.pushDisruptorManager = manager;
        }

        // ==================== Disruptor 消费入口 ====================

        /**
         * Disruptor 消费线程: 锁内 merge + 判断是否首次调度 sender.
         * 锁持有时间 ~100-500ns (仅 TreeMap.putAll), wsPushProcess 不在此线程.
         */
        @Override
        public void onEvent(PushEvent event, long sequence, boolean endOfBatch) throws Exception {
            try {
                PushModelTaskGroup incoming = event.getPushModelTaskGroup();
                if (incoming == null || incoming.isEmpty()) {
                    return;
                }

                refreshConfigIfNeeded();

                CombinedKeys batchKey = (CombinedKeys) incoming.getKey();
                boolean shouldSchedule = false;

                // 锁内: merge + 调度判断 (原子操作)
                synchronized (getLock(batchKey)) {
                    PushModelTaskGroup agg = batchMap.get(batchKey);
                    if (agg == null) {
                        // 首次: 直接放入
                        batchMap.put(batchKey, incoming);
                        bufferedItemCount.addAndGet(incoming.getWaitingSize());
                    } else {
                        // 合并
                        int before = agg.getWaitingSize();
                        agg.mergeAdd(incoming);
                        int after = agg.getWaitingSize();
                        bufferedItemCount.addAndGet(Math.max(0, after - before));
                    }

                    // 无活跃 sender → 标记 + 调度
                    if (!activeSenders.contains(batchKey)) {
                        activeSenders.add(batchKey);
                        shouldSchedule = true;
                    }
                    // 已有 sender 在运行 (或等待重试): 无需调度, 数据已 merge 进 buffer,
                    // sender 下次 copyForUse 时自然会取到
                }

                // 锁外调度 (避免在锁内做 executor 操作)
                if (shouldSchedule) {
                    int slot = selectSlot(batchKey);
                    try {
                        senderExecutors[slot].execute(() -> senderLoop(batchKey, 0));
                    } catch (RejectedExecutionException e) {
                        log.warn("[Handler-{}] sender rejected, key={}", index, batchKey);
                        synchronized (getLock(batchKey)) {
                            activeSenders.remove(batchKey);
                        }
                    }
                }
            } finally {
                event.clear();
                sampleCounter++;
            }
        }

        // ==================== Sender 线程逻辑 ====================

        /**
         * Sender 主循环: copy → send(无锁) → remove → 循环.
         *
         * 关键: wsPushProcess 执行期间不持任何锁,
         * Disruptor 线程可同时 merge 新数据到 batchMap.
         *
         * @param batchKey   目标 key
         * @param retryCount 当前重试计数 (首次为 0, 失败后递增)
         */
        private void senderLoop(CombinedKeys batchKey, int retryCount) {
            try {
                while (true) {

                    // ---- Phase 1: 锁内拷贝一批 (最多 500) ----
                    PushModelTaskGroup batch;
                    synchronized (getLock(batchKey)) {
                        PushModelTaskGroup agg = batchMap.get(batchKey);
                        if (agg == null || agg.isEmpty()) {
                            // 无数据, 清理 key 退出
                            cleanupKey(batchKey);
                            return;
                        }
                        batch = agg.copyForUse(); // max 500
                    }

                    if (batch == null || batch.isEmpty()) {
                        synchronized (getLock(batchKey)) {
                            cleanupKey(batchKey);
                        }
                        return;
                    }

                    int batchSize = batch.getWaitingSize();

                    // ---- Phase 2: 无锁发送 (核心耗时操作) ----
                    try {
                        webSocketPushLauncher.wsPushProcess(batch);
                        retryCount = 0; // 成功: 重置重试计数
                    } catch (Exception e) {
                        // 失败: 数据不移除, 调度重试
                        // activeSenders 保持 true → onEvent 不会重复调度
                        // 重试期间 Disruptor 线程可继续 merge 新数据
                        log.error("[Handler-{}] send failed, key={}, retry={}, buffered={}",
                                index, batchKey, retryCount,
                                getKeyBufferedSize(batchKey), e);
                        scheduleRetry(batchKey, retryCount + 1);
                        return; // 退出循环, 等待重试
                    }

                    // ---- Phase 3: 锁内移除已发送数据 ----
                    synchronized (getLock(batchKey)) {
                        PushModelTaskGroup agg = batchMap.get(batchKey);
                        if (agg != null) {
                            agg.mergeRemove(batch);
                        }
                        bufferedItemCount.addAndGet(-batchSize);
                    }

                    // 循环继续: 回到 Phase 1 检查是否还有数据
                }
            } catch (Throwable t) {
                // 意外异常: 清理状态, 允许新 sender 被调度
                log.error("[Handler-{}] senderLoop unexpected error, key={}",
                        index, batchKey, t);
                synchronized (getLock(batchKey)) {
                    activeSenders.remove(batchKey);
                }
            }
        }

        /**
         * 清理 key 的全部状态. 必须在 getLock(key) 同步块内调用.
         *
         * 原子性保证: isEmpty 判断 + remove + activeSenders.remove
         * 在同一把锁内完成, 不会与 onEvent 的 merge + 调度判断 产生竞态.
         */
        private void cleanupKey(CombinedKeys batchKey) {
            batchMap.remove(batchKey);
            activeSenders.remove(batchKey);
        }

        // ==================== 重试 ====================

        /**
         * 调度指数退避重试.
         *
         * activeSenders 保持 true → 重试期间 onEvent 不会重复调度 sender.
         * onEvent 仍可正常 merge 新数据 → 重试成功后会连同新数据一起 drain.
         *
         * 超过 maxRetryCount 后仍继续重试 (保证不丢数据), 但触发严重告警.
         *
         * @param batchKey   目标 key
         * @param retryCount 当前重试次数 (从 1 开始)
         */
        private void scheduleRetry(CombinedKeys batchKey, int retryCount) {
            // 告警
            if (retryCount > ALARM_RETRY_THRESHOLD && retryCount <= maxRetryCount) {
                AlarmUtils.error("[Handler-{}] retry count={}, key={}", index, retryCount, batchKey);
            }
            if (retryCount > maxRetryCount) {
                AlarmUtils.error("[Handler-{}] EXCEEDED maxRetry={}, still retrying, count={}, key={}",
                        index, maxRetryCount, retryCount, batchKey);
            }

            // 指数退避: 100, 200, 400, 800, ... 最大 30s
            int exponent = Math.min(retryCount, 15);
            long delay = Math.min(BASE_RETRY_DELAY_MS * (1L << exponent), MAX_RETRY_DELAY_MS);

            int slot = selectSlot(batchKey);
            try {
                senderExecutors[slot].schedule(
                        () -> senderLoop(batchKey, retryCount),
                        delay, TimeUnit.MILLISECONDS);
            } catch (RejectedExecutionException e) {
                // Executor 已关闭 (优雅停机)
                log.warn("[Handler-{}] retry schedule rejected, key={}", index, batchKey);
                synchronized (getLock(batchKey)) {
                    activeSenders.remove(batchKey);
                }
            }
        }

        // ==================== 工具方法 ====================

        /** 共享锁选择: hash 扰动 + 取模 */
        private Object getLock(CombinedKeys key) {
            int h = key.hashCode();
            h ^= (h >>> 16);
            return locks[(h & 0x7FFFFFFF) % LOCK_COUNT];
        }

        /** Sender 线程选择: 同 key 同 slot, 保证 key 内有序 */
        private int selectSlot(CombinedKeys batchKey) {
            int h = batchKey.hashCode();
            h ^= (h >>> 16);
            return (h & 0x7FFFFFFF) % SENDER_THREAD_COUNT;
        }

        /** 获取某 key 的当前缓冲大小 (用于日志) */
        private int getKeyBufferedSize(CombinedKeys key) {
            PushModelTaskGroup agg = batchMap.get(key);
            return agg != null ? agg.getWaitingSize() : 0;
        }

        // ==================== 生命周期 ====================

        public void destroySenders() {
            for (ScheduledThreadPoolExecutor executor : senderExecutors) {
                if (executor != null) {
                    executor.shutdown();
                    try {
                        if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                            log.warn("[Handler-{}] sender not terminated in 30s, forcing", index);
                            executor.shutdownNow();
                        }
                    } catch (InterruptedException e) {
                        executor.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        // ==================== 配置 ====================

        private void refreshConfigIfNeeded() {
            if (++configRefreshCounter >= CONFIG_REFRESH_EVERY) {
                configRefreshCounter = 0;
                UnifiedCommonConfig config = commonConfigSupplier.get();
                if (config != null) {
                    int configured = config.getPushMaxRetryCount();
                    if (configured > 0) {
                        this.maxRetryCount = configured;
                    }
                }
            }
        }

        // ==================== 监控 ====================

        public void initMonitor(RingBuffer<PushEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
            List<Tag> extraTags = List.of(Tag.of("index", String.valueOf(index)));

            metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_BACKLOG,
                    () -> this.ringBuffer.getBufferSize() - this.ringBuffer.remainingCapacity(),
                    extraTags);
            metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_QPS,
                    () -> sampleCounter, extraTags);
            metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_BUFFERED_ITEMS,
                    bufferedItemCount::get, extraTags);
            metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_DISRUPTOR_BUFFERED_KEYS,
                    this::getBufferedKeyCount, extraTags);

            for (int i = 0; i < SENDER_THREAD_COUNT; i++) {
                final int si = i;
                List<Tag> st = List.of(
                        Tag.of("index", String.valueOf(index)),
                        Tag.of("sender", String.valueOf(i)));
                metricsReporter.gauge(UtaMetricsEnum.UTA_GAUGE_WS_PUSH_SENDER_QUEUE_DEPTH,
                        () -> senderExecutors[si].getQueue().size(), st);
            }
        }

        int getBufferedKeyCount() {
            return batchMap.size();
        }
    }

4.3 PushDisruptorManager(完整类)

package com.upex.unified.account.counter.push.disruptor;

import com.alipay.sofa.jraft.util.DisruptorBuilder;
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.upex.unified.account.counter.launcher.WebSocketPushLauncher;
import com.upex.unified.account.counter.model.PushModelTaskGroup;
import com.upex.unified.account.facade.utils.AccountIdJoinUtils;
import com.upex.unified.account.service.config.apollo.UnifiedCommonConfig;
import com.upex.unified.account.service.metrics.reporter.MetricsReporterWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;

import java.util.function.Supplier;

@Slf4j
public class PushDisruptorManager {

    private static final int DISRUPTOR_COUNT = 32;
    private static final int RING_BUFFER_SIZE = 1 << 15;
    private static final int MASK = DISRUPTOR_COUNT - 1;

    @SuppressWarnings("unchecked")
    private final Disruptor<PushEvent>[] disruptors
            = (Disruptor<PushEvent>[]) new Disruptor[DISRUPTOR_COUNT];

    @SuppressWarnings("unchecked")
    private final RingBuffer<PushEvent>[] ringBuffers
            = (RingBuffer<PushEvent>[]) new RingBuffer[DISRUPTOR_COUNT];

    private final WebSocketPushLauncher.PushEventHandler[] eventHandlers
            = new WebSocketPushLauncher.PushEventHandler[DISRUPTOR_COUNT];

    private final WebSocketPushLauncher webSocketPushLauncher;
    private final Supplier<UnifiedCommonConfig> commonConfigSupplier;

    public PushDisruptorManager(WebSocketPushLauncher webSocketPushLauncher,
                                Supplier<UnifiedCommonConfig> commonConfigSupplier) {
        this.webSocketPushLauncher = webSocketPushLauncher;
        this.commonConfigSupplier = commonConfigSupplier;
    }

    /** 初始化 (修改点: Phase2 注入 manager 引用) */
    public void initPushEngine(MetricsReporterWrapper metricsReporter) {
        try {
            for (int i = 0; i < DISRUPTOR_COUNT; i++) {
                initPushDisruptor(i, metricsReporter);
            }
            // Phase2: 所有 RingBuffer 就绪后注入 manager 引用
            for (int i = 0; i < DISRUPTOR_COUNT; i++) {
                eventHandlers[i].setPushDisruptorManager(this);
            }
            log.info("Disruptor init done, total {}", DISRUPTOR_COUNT);
        } catch (Exception e) {
            log.error("initPushEngine error.", e);
        }
    }

    /** 初始化单个 Disruptor (无修改) */
    void initPushDisruptor(int index, MetricsReporterWrapper metricsReporter) {
        String prefix = "pushEngineDisruptor-" + index;
        NamedThreadFactory threadFactory = new NamedThreadFactory(prefix, true);
        disruptors[index] = DisruptorBuilder.<PushEvent>newInstance()
                .setRingBufferSize(RING_BUFFER_SIZE)
                .setEventFactory(new PushEventFactory())
                .setThreadFactory(threadFactory)
                .setProducerType(ProducerType.MULTI)
                .setWaitStrategy(new LiteBlockingWaitStrategy())
                .build();
        eventHandlers[index] = new WebSocketPushLauncher.PushEventHandler(
                index, webSocketPushLauncher, metricsReporter, commonConfigSupplier);
        disruptors[index].handleEventsWith(eventHandlers[index]);
        disruptors[index].setDefaultExceptionHandler(new LogExceptionHandler<>(prefix));
        ringBuffers[index] = disruptors[index].start();
        eventHandlers[index].initMonitor(ringBuffers[index]);
    }

    /** 发布事件 (无修改) */
    public void publish(String shardingKey, PushModelTaskGroup pushModelTaskGroups) {
        Pair<Long, Long> accountIdPair = AccountIdJoinUtils.mustGetIdsWithOutExe(shardingKey);
        int idx = indexOfMurmurHash3(accountIdPair.getLeft());
        RingBuffer<PushEvent> ringBuffer = ringBuffers[idx];
        long sequence = ringBuffer.next();
        try {
            PushEvent pushEvent = ringBuffer.get(sequence);
            pushEvent.setPushModelTaskGroup(pushModelTaskGroups);
        } finally {
            ringBuffer.publish(sequence);
        }
    }

    int indexOfMurmurHash3(long accountId) {
        long x = accountId;
        x ^= (x >>> 33);
        x *= 0xff51afd7ed558ccdL;
        x ^= (x >>> 33);
        return ((int) x) & MASK;
    }

    /** 优雅关闭 (修改点: 先 Disruptor, 再 Sender) */
    public void destroy() throws Exception {
        for (int i = 0; i < DISRUPTOR_COUNT; i++) {
            if (disruptors[i] != null) disruptors[i].shutdown();
        }
        for (int i = 0; i < DISRUPTOR_COUNT; i++) {
            if (eventHandlers[i] != null) eventHandlers[i].destroySenders();
        }
        log.info("All Disruptors and senders shutdown.");
    }

    public static class PushEventFactory implements EventFactory<PushEvent> {
        @Override
        public PushEvent newInstance() { return new PushEvent(); }
    }
}

4.4 PushModelTaskGroup(无修改)

package com.upex.unified.account.counter.model;

import com.upex.mixcontract.common.framework.model.IGroupData;
import com.upex.mixcontract.common.repo.CombinedKeys;
import com.upex.stream.dto.websocket.CacheKey;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.TreeMap;

@Slf4j
@Getter
public class PushModelTaskGroup implements IGroupData<PushModelTaskGroup> {

    private final CacheKey cacheKey;
    private TreeMap<Long, Object> wsPushMap = new TreeMap<>();
    private CombinedKeys key = null;

    public PushModelTaskGroup(CacheKey cacheKey, long version, Object wsPushModel) {
        this.cacheKey = cacheKey;
        if (wsPushModel != null) {
            this.wsPushMap.put(version, wsPushModel);
        }
    }

    @Override
    public Object getKey() {
        if (key == null) {
            this.key = CombinedKeys.builder()
                    .appendColumn(cacheKey.getTopic())
                    .appendColumn(cacheKey.getKey2())
                    .appendColumn(cacheKey.getKey3())
                    .build();
        }
        return key;
    }

    @Override public void mergeAdd(PushModelTaskGroup data) { wsPushMap.putAll(data.getWsPushMap()); }

    @Override
    public void mergeRemove(PushModelTaskGroup data) {
        for (Map.Entry<Long, Object> entry : data.wsPushMap.entrySet()) {
            Object pushModel = this.wsPushMap.get(entry.getKey());
            if (pushModel == entry.getValue()) { this.wsPushMap.remove(entry.getKey()); }
        }
    }

    @Override
    public PushModelTaskGroup copyForUse() {
        PushModelTaskGroup copy = new PushModelTaskGroup(this.cacheKey, 0, null);
        int count = 0;
        for (Map.Entry<Long, Object> entry : this.wsPushMap.entrySet()) {
            copy.wsPushMap.put(entry.getKey(), entry.getValue());
            if (++count >= 500) break;
        }
        return copy;
    }

    @Override public boolean isEmpty() { return wsPushMap.isEmpty(); }
    @Override public void clear() { wsPushMap.clear(); }
    @Override public int getWaitingSize() { return wsPushMap.size(); }
}

5. 四方性能详细对比

GroupDataProcessor(GDP)、1.8 ReentrantLock(1.8RL)、V1 线程亲和、V2 并行攒发

5.1 单事件周期锁/同步操作

阶段GDP1.8 RLV1V2
merge 1× synchronized 1× ReentrantLock 0 (线程亲和) 1× synchronized
~100-500ns
调度判断 1× CHM.get 1× CAS 0 (boolean判断) 锁内 Set.contains
~10ns
copyForUse 1× synchronized 1× ReentrantLock 0 1× synchronized
~200-1000ns
wsPushProcess 四者相同, 无锁, 1-10ms
mergeRemove 1× synchronized 1× ReentrantLock 0 1× synchronized
~200-1000ns
总锁操作 3× sync 4× RL + CAS 0锁 + 1 CAS 3× sync (极短)
merge/send 并行? 是 (不同线程) 是 (send时不持锁) 否 (同一线程串行) 是 (send时不持锁)
V2 引入了 3 次 synchronized,但每次持锁仅 ~100-1000ns。换来的是 merge 和 send 的真正并行
对比 V1 的 0 次锁:V1 虽然无锁,但 merge 和 send 在同一线程串行,wsPushProcess 的 1-10ms 期间完全无法 merge 新数据。
V2 的 3μs 锁开销 vs V1 的 1-10ms 串行等待 → V2 吞吐量显著更优。

5.2 并发度与吞吐量

指标GDP1.8 RLV1V2
Disruptor 线程工作 N/A 持 RL merge
~0.1-10μs
仅 execute 提交
~0.1μs
持 sync merge
~0.1-0.5μs
Sender 可用比
(send时间/总时间)
~99%
仅短暂持锁
~95%
RL 竞争损耗
send 期间 merge 阻塞
有效利用率低
~99%
sync 极短
高频场景 500K evt/s sync 竞争 RL 严重竞争 串行瓶颈 sync 无竞争
(不同阶段交错)
Key 隔离 差 (共享池) 差 (共享 sender) 好 (1/4 影响) 好 (1/4 影响)
内存/key ~108B ~208B ~56B ~40B
(仅 CHM entry)
泄漏风险 无 (cleanupKey)

5.3 场景化对比

场景:突发高频 + 网络抖动(wsPushProcess 间歇慢 50ms)

V1 (线程亲和)V2 (并行攒发)
send 50ms 期间 ~50个事件排在 executor 队列
无法 merge, 逐个处理
~50个事件全部 merge 进 batchMap
下次 copy 一把取走
实际批大小 每次 1 个事件 (串行入队) 每次最多 500 个 (聚合效果)
wsPushProcess 调用次数 50 次 (逐个发) 1 次 (聚合后批发)
总延迟 50 × (merge + send) ≈ 2.5s 1 × send ≈ 50ms
V2 的聚合效应极其显著:send 期间的新数据全部 merge 到 buffer,下次 drain 时批量发送。V1 逐个处理,wsPushProcess 调用次数是 V2 的 50 倍。

6. 工作总结

6.1 V1 → V2 核心改进

问题V1V2
merge/send 串行同一线程, wsPushProcess 阻塞 mergeDisruptor 线程 merge, Sender 线程 send, 真正并行
KeySendState 未定义私有内部类, 不够透明彻底消除, 状态分散到 batchMap + activeSenders
重试不够健壮10次限制, 空触发器机制复杂2000次可配置, 直接 schedule 到 Sender, 持续积攒
首次发送executor.execute 提交同样, executor.execute 立即提交 (无延迟)

6.2 设计决策

决策选择理由
同步机制共享锁数组 64 个 Object固定大小无泄漏, synchronized 无竞争 ~50ns, 远优于 per-key ReentrantLock
merge 位置Disruptor 线程 (锁内)确保 send 期间可并行 merge, 锁持续 <1μs 不阻塞 Disruptor
重试方式Sender 线程直接 schedule比 V1 的空触发器方案更简洁, 避免 Disruptor 额外负载
activeSenders 清理cleanupKey 在锁内原子清理与 onEvent 的调度判断原子互斥, 无竞态窗口

6.3 改动范围

文件改动
PushEvent无修改
PushEventHandler完全重写 (V2: 并行攒发 + 共享锁 + 健壮重试)
PushDisruptorManager小幅修改 (manager注入 + 关闭顺序)
PushModelTaskGroup无修改
UtaMetricsEnum新增 SENDER_QUEUE_DEPTH
UnifiedCommonConfig新增 pushMaxRetryCount 配置项

7. 风险与建议

风险 1:共享锁槽冲突
不同 key 可能映射到同一锁槽 (64 个槽位)。高频场景下同槽 key 的 merge 和 copy 可能短暂竞争。
  • 锁持有 <1μs, 竞争概率极低
  • 必要时可增加 LOCK_COUNT 到 128 或 256
风险 2:2000 次重试堆积
极端场景下连续失败 2000 次(30s 间隔),期间数据持续积攒可能导致内存增长。
  • 通过 BUFFERED_ITEMS 指标监控
  • 结合 maxRetryCount 告警及时介入
上线建议:
  1. 灰度 2-3 个 handler, 观察 Disruptor backlog 和 sender 队列深度
  2. 对比 V1/V2: wsPushProcess 调用次数、P99 延迟、缓冲 key 数
  3. 确认锁竞争情况 (可通过 JFR 或 async-profiler 采样)

关键配置

参数默认说明
SENDER_THREAD_COUNT4每 handler 的 Sender 线程数
LOCK_COUNT64共享锁数组大小
DEFAULT_MAX_RETRY_COUNT2000最大重试次数 (可通过 Apollo 动态配置)
BASE_RETRY_DELAY_MS100首次重试延迟
MAX_RETRY_DELAY_MS30000最大重试延迟
copyForUse 上限500每批最多发送条数

PushEventHandler V2 — 真正的边合并边发送 — 2026-03-20