031600-URS达人等级向上传播更新延时队列方案.md 19 KB

URS达人等级向上传播更新 - 延时队列方案

问题分析

当前问题

当某个用户的上级更新了达人指标时,这个更新没有向上传播到更高层级的上级(上上级、上上上级等),导致达人等级统计不准确。

根本原因

  • UrsReferralCreatedListener:只更新直接推荐人,没有向上传播
  • UrsTalentLevelUpListener:只记录日志,没有更新上级用户

项目延时队列技术调研

1. 项目中的延时队列实现

1.1 Laravel标准延时队列

// 使用delay()方法
SomeJob::dispatch($data)->delay(60); // 延迟60秒
SomeJob::dispatch($data)->delay(now()->addMinutes(5)); // 延迟5分钟

// Transfer模块示例
ProcessTransferOrderJob::dispatch($order)
    ->delay(now()->addSeconds(5)); // 延迟5秒处理

1.2 项目自定义延时队列(DelayQueue模块)

// 使用Redis延时队列
Redis::addQueue($callback, $runParam, $delay = 3);

// DQueueJob示例
static protected function jobUpdate($parameter)
{
    Redis::addQueue([static::class, 'updateSync'], $parameter, static::getDelay());
}

1.3 LCache模块的延时队列

// QueueJob延时处理
self::dispatch($parameter)->delay(2)->onQueue($queue);

2. 延时队列的优势

  • 避免雪崩效应:防止大量用户同时升级时造成系统压力
  • 数据一致性:通过延时处理,确保数据更新的顺序性
  • 系统稳定性:分散处理负载,避免阻塞主流程
  • 防重复更新:延时期间可以合并相同用户的多次更新

延时队列方案设计(基于自定义DelayQueue)

1. 核心策略

分层延时处理机制

  • 第1级(直接上级):即时同步处理,确保实时性
  • 第2级(上上级):延时5秒处理,避免频繁更新
  • 第3级及以上:延时递增处理,层级越高延时越长

延时时间设计

$delaySeconds = match($level) {
    1 => 0,           // 直接上级:即时处理
    2 => 5,           // 上上级:延时5秒
    3 => 10,          // 第3级:延时10秒
    4 => 15,          // 第4级:延时15秒
    default => min(($level - 1) * 5, 60) // 最大延时60秒
};

DelayQueue模块特点

  • Redis防重复机制:使用Redis key防止重复任务
  • 自动延时调度:通过Laravel队列系统实现延时执行
  • 回调机制:支持类方法回调,灵活性高

2. 技术实现方案(基于自定义DelayQueue)

核心实现逻辑

// 在UrsTalentLevelUpListener中
private function updateUpstreamTalentLevels(int $userId): void
{
    try {
        $ursUserId = UrsUserMappingService::getMappingUrsUserId($userId);
        if (!$ursUserId) {
            Log::warning('用户URS映射不存在,无法进行向上传播', ['user_id' => $userId]);
            return;
        }

        $referralChain = UrsReferralService::getReferralChain($ursUserId, 20);
        if (empty($referralChain)) {
            Log::info('用户无上级推荐关系,无需向上传播', ['user_id' => $userId]);
            return;
        }

        Log::info('开始URS达人等级延时队列向上传播', [
            'user_id' => $userId,
            'total_upstream_levels' => count($referralChain)
        ]);

        $immediateUpdateCount = 0;
        $delayQueueCount = 0;

        foreach ($referralChain as $level => $ursReferrerId) {
            $referrerId = UrsUserMapping::getFarmUserIdByUrsUserId($ursReferrerId);
            if (!$referrerId) {
                Log::warning('上级用户映射不存在,跳过更新', [
                    'level' => $level,
                    'urs_referrer_id' => $ursReferrerId
                ]);
                continue;
            }

            if ($level === 1) {
                // 第1级(直接上级):即时同步处理
                $this->updateDirectReferrerTalentLevel($userId, $referrerId, $level);
                $immediateUpdateCount++;
            } else {
                // 第2级及以上:DelayQueue延时处理
                $this->addDelayQueueUpdate($userId, $referrerId, $level);
                $delayQueueCount++;
            }
        }

        Log::info('URS达人等级延时队列向上传播完成', [
            'user_id' => $userId,
            'immediate_update_count' => $immediateUpdateCount,
            'delay_queue_count' => $delayQueueCount
        ]);

    } catch (\Exception $e) {
        Log::error('URS达人等级延时队列向上传播失败', [
            'user_id' => $userId,
            'error' => $e->getMessage(),
            'trace' => $e->getTraceAsString()
        ]);
    }
}

/**
 * 即时更新直接上级的达人等级
 */
private function updateDirectReferrerTalentLevel(int $originalUserId, int $referrerId, int $level): void
{
    try {
        Log::info('即时更新直接上级达人等级', [
            'original_user_id' => $originalUserId,
            'referrer_id' => $referrerId,
            'level' => $level
        ]);

        $startTime = microtime(true);
        $result = UrsTalentService::updateTalentLevel($referrerId);
        $endTime = microtime(true);

        Log::info('即时更新直接上级达人等级成功', [
            'original_user_id' => $originalUserId,
            'referrer_id' => $referrerId,
            'level' => $level,
            'new_talent_level' => $result->talentLevel,
            'execution_time_ms' => round(($endTime - $startTime) * 1000, 2)
        ]);

    } catch (\Exception $e) {
        Log::error('即时更新直接上级达人等级失败', [
            'original_user_id' => $originalUserId,
            'referrer_id' => $referrerId,
            'level' => $level,
            'error' => $e->getMessage()
        ]);
    }
}

/**
 * 添加DelayQueue延时更新任务
 */
private function addDelayQueueUpdate(int $originalUserId, int $referrerId, int $level): void
{
    try {
        // 计算延时时间
        $delaySeconds = min(($level - 1) * 5, 60);

        // 准备回调参数
        $callback = [UrsTalentUpstreamUpdateService::class, 'updateTalentLevel'];
        $runParam = [
            'referrer_id' => $referrerId,
            'original_user_id' => $originalUserId,
            'level' => $level,
            'trigger_time' => time()
        ];

        Log::info('添加DelayQueue延时更新任务', [
            'original_user_id' => $originalUserId,
            'referrer_id' => $referrerId,
            'level' => $level,
            'delay_seconds' => $delaySeconds,
            'callback_class' => $callback[0],
            'callback_method' => $callback[1]
        ]);

        // 添加到DelayQueue
        $result = \App\Module\DelayQueue\Redis::addQueue($callback, $runParam, $delaySeconds);

        if ($result === 0) {
            Log::info('DelayQueue任务已存在,跳过重复添加', [
                'referrer_id' => $referrerId,
                'level' => $level
            ]);
        } else {
            Log::info('DelayQueue任务添加成功', [
                'referrer_id' => $referrerId,
                'level' => $level,
                'delay_seconds' => $delaySeconds
            ]);
        }

    } catch (\Exception $e) {
        Log::error('添加DelayQueue延时更新任务失败', [
            'original_user_id' => $originalUserId,
            'referrer_id' => $referrerId,
            'level' => $level,
            'error' => $e->getMessage()
        ]);
    }
}

3. UrsTalentUpstreamUpdateService设计

服务类实现

<?php

namespace App\Module\UrsPromotion\Services;

use App\Module\UrsPromotion\Services\UrsTalentService;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Cache;

/**
 * URS达人等级上级更新服务
 *
 * 专门处理DelayQueue延时队列的回调更新逻辑
 */
class UrsTalentUpstreamUpdateService
{
    /**
     * DelayQueue回调方法:更新达人等级
     *
     * 此方法被DelayQueue模块调用,用于延时更新上级用户的达人等级
     *
     * @param array $runParam 运行参数
     * @return bool 更新结果
     */
    public static function updateTalentLevel(array $runParam): bool
    {
        try {
            // 提取参数
            $referrerId = $runParam['referrer_id'] ?? null;
            $originalUserId = $runParam['original_user_id'] ?? null;
            $level = $runParam['level'] ?? null;
            $triggerTime = $runParam['trigger_time'] ?? null;

            // 参数验证
            if (!$referrerId || !$originalUserId || !$level) {
                Log::error('DelayQueue回调参数不完整', [
                    'run_param' => $runParam
                ]);
                return false;
            }

            Log::info('DelayQueue开始更新上级达人等级', [
                'referrer_id' => $referrerId,
                'original_user_id' => $originalUserId,
                'level' => $level,
                'trigger_time' => $triggerTime,
                'delay_duration' => $triggerTime ? (time() - $triggerTime) : null
            ]);

            // 执行达人等级更新(无需防重复检查,因为操作是幂等的)
            $startTime = microtime(true);
            $result = UrsTalentService::updateTalentLevel($referrerId);
            $endTime = microtime(true);

            Log::info('DelayQueue更新上级达人等级成功', [
                'referrer_id' => $referrerId,
                'original_user_id' => $originalUserId,
                'level' => $level,
                'talent_level' => $result->talentLevel,
                'direct_count' => $result->directCount,
                'promotion_count' => $result->promotionCount,
                'execution_time_ms' => round(($endTime - $startTime) * 1000, 2)
            ]);

            return true;

        } catch (\Exception $e) {
            Log::error('DelayQueue更新上级达人等级失败', [
                'referrer_id' => $referrerId ?? null,
                'original_user_id' => $originalUserId ?? null,
                'level' => $level ?? null,
                'error' => $e->getMessage(),
                'trace' => $e->getTraceAsString()
            ]);

            // 返回false,让DelayQueue知道任务失败
            return false;
        }
    }

    /**
     * 批量更新多个用户的达人等级
     *
     * @param array $userIds 用户ID数组
     * @return array 更新结果
     */
    public static function batchUpdateTalentLevels(array $userIds): array
    {
        $results = [];

        foreach ($userIds as $userId) {
            try {
                $result = UrsTalentService::updateTalentLevel($userId);
                $results[$userId] = [
                    'success' => true,
                    'talent_level' => $result->talentLevel,
                    'direct_count' => $result->directCount,
                    'promotion_count' => $result->promotionCount
                ];
            } catch (\Exception $e) {
                $results[$userId] = [
                    'success' => false,
                    'error' => $e->getMessage()
                ];
            }
        }

        Log::info('批量更新达人等级完成', [
            'total_users' => count($userIds),
            'success_count' => count(array_filter($results, fn($r) => $r['success'])),
            'failed_count' => count(array_filter($results, fn($r) => !$r['success']))
        ]);

        return $results;
    }
}

4. DelayQueue防重复机制

DelayQueue内置防重复(充分且必要)

DelayQueue模块本身具有防重复机制,对于幂等操作来说已经足够:

// DelayQueue\Redis::addQueue方法中的防重复逻辑
$key = self::E_KEY . $callback[0] . $callback[1]; // 生成唯一key
if ($a->exists($key)) {
    return 0; // 如果key已存在,返回0表示跳过
}
$a->set($key, 1, $delay); // 设置Redis key,过期时间为延时时间

业务操作幂等性分析

URS达人等级更新操作具有天然的幂等性:

// 每次都重新计算,不是累加操作
$stats = UrsReferralService::getReferralStats($ursUserId);
$newLevel = self::calculateTalentLevel($stats['direct_count'], $stats['total_team_count']);

// 直接覆盖更新,不是递增操作
$talent->update([
    'talent_level' => $newLevel,                    // 直接设置
    'direct_count' => $stats['direct_count'],       // 直接设置
    'promotion_count' => $stats['total_team_count'] // 直接设置
]);

无需额外防重复检查

原因:

  1. DelayQueue防重复:已防止重复任务添加
  2. 操作幂等性:重复执行不会产生错误数据
  3. 简化代码:减少不必要的复杂性
  4. 提高性能:避免额外的Redis锁开销

5. 监控和日志体系

分层日志记录

// 1. 任务分发阶段
Log::info('添加DelayQueue延时更新任务', [
    'original_user_id' => $originalUserId,
    'referrer_id' => $referrerId,
    'level' => $level,
    'delay_seconds' => $delaySeconds,
    'callback_class' => 'UrsTalentUpstreamUpdateService',
    'callback_method' => 'updateTalentLevel'
]);

// 2. 任务执行阶段
Log::info('DelayQueue开始更新上级达人等级', [
    'referrer_id' => $referrerId,
    'original_user_id' => $originalUserId,
    'level' => $level,
    'trigger_time' => $triggerTime,
    'delay_duration' => time() - $triggerTime
]);

// 3. 任务完成阶段
Log::info('DelayQueue更新上级达人等级成功', [
    'referrer_id' => $referrerId,
    'level' => $level,
    'new_talent_level' => $result->talentLevel,
    'execution_time_ms' => round(($endTime - $startTime) * 1000, 2)
]);

性能监控指标

// 执行时间监控
$startTime = microtime(true);
$result = UrsTalentService::updateTalentLevel($referrerId);
$endTime = microtime(true);

// 延时精度监控
$actualDelay = time() - $triggerTime;
$expectedDelay = $delaySeconds;
$delayAccuracy = abs($actualDelay - $expectedDelay);

Log::info('DelayQueue性能统计', [
    'referrer_id' => $referrerId,
    'level' => $level,
    'execution_time_ms' => round(($endTime - $startTime) * 1000, 2),
    'expected_delay' => $expectedDelay,
    'actual_delay' => $actualDelay,
    'delay_accuracy' => $delayAccuracy
]);

错误监控和告警

// 错误分类记录
Log::error('DelayQueue更新失败', [
    'error_type' => 'talent_update_failed',
    'referrer_id' => $referrerId,
    'level' => $level,
    'error_message' => $e->getMessage(),
    'error_code' => $e->getCode(),
    'retry_count' => 0 // DelayQueue不自动重试,需要手动处理
]);

// 关键指标监控
Log::info('DelayQueue统计指标', [
    'total_tasks_added' => $delayQueueCount,
    'immediate_updates' => $immediateUpdateCount,
    'success_rate' => $successCount / $totalCount,
    'average_execution_time' => $avgExecutionTime
]);

方案优势

1. 性能优势

  • 分散负载:通过延时处理,避免瞬时高并发
  • 减少数据库压力:延时合并相同用户的多次更新
  • 系统稳定性:避免雪崩效应

2. 业务优势

  • 数据一致性:确保上级用户等级最终一致
  • 实时性平衡:直接上级即时更新,保证重要数据实时性
  • 可扩展性:支持20级推荐关系的大规模处理

3. 技术优势

  • 防重复机制:Redis锁和队列去重双重保障
  • 错误恢复:队列重试机制保证可靠性
  • 监控完善:详细的日志和性能监控

详细实施计划

第一阶段:核心功能实现(预计2小时)

1.1 创建UrsTalentUpstreamUpdateService(30分钟)

  • 文件位置app/Module/UrsPromotion/Services/UrsTalentUpstreamUpdateService.php
  • 核心方法updateTalentLevel(array $runParam): bool
  • 功能特点
    • DelayQueue回调接口实现
    • Redis锁防重复机制
    • 详细的日志记录
    • 性能监控统计

1.2 修改UrsTalentLevelUpListener(45分钟)

  • 文件位置app/Module/UrsPromotion/Listeners/UrsTalentLevelUpListener.php
  • 修改内容
    • 添加updateUpstreamTalentLevels方法
    • 添加updateDirectReferrerTalentLevel方法
    • 添加addDelayQueueUpdate方法
    • 集成DelayQueue调用逻辑

1.3 修改UrsReferralCreatedListener(30分钟)

  • 文件位置app/Module/UrsPromotion/Listeners/UrsReferralCreatedListener.php
  • 修改内容
    • 复用UrsTalentLevelUpListener的向上传播逻辑
    • 或者提取公共trait/service

1.4 基础测试(15分钟)

  • 单元测试:测试DelayQueue回调方法
  • 集成测试:测试完整的事件流程

第二阶段:DelayQueue配置和优化(预计1小时)

2.1 DelayQueue运行环境检查(20分钟)

  • 检查命令:确认app-delayqueue:run命令正常运行
  • Redis配置:验证Redis连接和key过期机制
  • 队列处理:确认Laravel队列worker正常工作

2.2 延时时间调优(20分钟)

  • 测试不同延时时间:5秒、10秒、15秒等
  • 性能测试:模拟大量用户同时升级
  • 调整延时算法:根据测试结果优化延时计算

2.3 监控配置(20分钟)

  • 日志配置:确保日志正确输出
  • 性能指标:添加关键性能监控
  • 错误告警:配置错误通知机制

第三阶段:全面测试和验证(预计1.5小时)

3.1 功能测试(45分钟)

  • 单用户升级测试:验证向上传播逻辑
  • 多用户并发测试:验证防重复机制
  • 边界情况测试:无上级、映射不存在等

3.2 性能测试(30分钟)

  • 压力测试:模拟100个用户同时升级
  • 延时精度测试:验证DelayQueue延时准确性
  • 资源消耗测试:监控CPU、内存、Redis使用

3.3 数据一致性验证(15分钟)

  • 最终一致性:验证所有上级用户最终都被更新
  • 数据准确性:验证达人等级计算正确
  • 日志完整性:验证所有操作都有日志记录

第四阶段:部署和监控(预计30分钟)

4.1 生产环境部署(15分钟)

  • 代码部署:发布新版本代码
  • 配置检查:验证生产环境DelayQueue配置
  • 服务重启:重启相关服务

4.2 上线监控(15分钟)

  • 实时监控:观察DelayQueue任务执行情况
  • 错误监控:关注错误日志和异常
  • 性能监控:监控系统性能指标

技术优势总结

1. DelayQueue模块优势

  • 内置防重复:Redis key机制自动防止重复任务
  • 灵活回调:支持任意类方法回调,扩展性强
  • 项目集成:与现有架构无缝集成
  • 稳定可靠:基于Laravel队列系统,经过验证

2. 业务优势

  • 分散负载:延时处理避免瞬时高并发
  • 数据一致性:最终一致性保证,直接上级实时更新
  • 系统稳定:避免雪崩效应,提升系统稳定性
  • 可扩展性:支持20级推荐关系的大规模处理

3. 运维优势

  • 监控完善:详细的日志和性能监控
  • 故障恢复:DelayQueue失败不影响主流程
  • 调试友好:清晰的日志便于问题排查
  • 配置灵活:延时时间可根据需要调整

风险控制

1. 技术风险

  • DelayQueue依赖:确保DelayQueue服务正常运行
  • Redis稳定性:Redis故障可能影响防重复机制
  • 队列积压:大量任务可能导致队列积压

2. 缓解措施

  • 监控告警:实时监控DelayQueue运行状态
  • 降级方案:DelayQueue失败时记录日志,后续补偿
  • 容量规划:合理配置队列worker数量
  • 备份机制:重要操作记录到数据库,便于恢复