# URS达人等级向上传播更新 - 延时队列方案 ## 问题分析 ### 当前问题 当某个用户的上级更新了达人指标时,这个更新没有向上传播到更高层级的上级(上上级、上上上级等),导致达人等级统计不准确。 ### 根本原因 - `UrsReferralCreatedListener`:只更新直接推荐人,没有向上传播 - `UrsTalentLevelUpListener`:只记录日志,没有更新上级用户 ## 项目延时队列技术调研 ### 1. 项目中的延时队列实现 #### 1.1 Laravel标准延时队列 ```php // 使用delay()方法 SomeJob::dispatch($data)->delay(60); // 延迟60秒 SomeJob::dispatch($data)->delay(now()->addMinutes(5)); // 延迟5分钟 // Transfer模块示例 ProcessTransferOrderJob::dispatch($order) ->delay(now()->addSeconds(5)); // 延迟5秒处理 ``` #### 1.2 项目自定义延时队列(DelayQueue模块) ```php // 使用Redis延时队列 Redis::addQueue($callback, $runParam, $delay = 3); // DQueueJob示例 static protected function jobUpdate($parameter) { Redis::addQueue([static::class, 'updateSync'], $parameter, static::getDelay()); } ``` #### 1.3 LCache模块的延时队列 ```php // QueueJob延时处理 self::dispatch($parameter)->delay(2)->onQueue($queue); ``` ### 2. 延时队列的优势 - **避免雪崩效应**:防止大量用户同时升级时造成系统压力 - **数据一致性**:通过延时处理,确保数据更新的顺序性 - **系统稳定性**:分散处理负载,避免阻塞主流程 - **防重复更新**:延时期间可以合并相同用户的多次更新 ## 延时队列方案设计(基于自定义DelayQueue) ### 1. 核心策略 #### 分层延时处理机制 - **第1级(直接上级)**:即时同步处理,确保实时性 - **第2级(上上级)**:延时5秒处理,避免频繁更新 - **第3级及以上**:延时递增处理,层级越高延时越长 #### 延时时间设计 ```php $delaySeconds = match($level) { 1 => 0, // 直接上级:即时处理 2 => 5, // 上上级:延时5秒 3 => 10, // 第3级:延时10秒 4 => 15, // 第4级:延时15秒 default => min(($level - 1) * 5, 60) // 最大延时60秒 }; ``` #### DelayQueue模块特点 - **Redis防重复机制**:使用Redis key防止重复任务 - **自动延时调度**:通过Laravel队列系统实现延时执行 - **回调机制**:支持类方法回调,灵活性高 ### 2. 技术实现方案(基于自定义DelayQueue) #### 核心实现逻辑 ```php // 在UrsTalentLevelUpListener中 private function updateUpstreamTalentLevels(int $userId): void { try { $ursUserId = UrsUserMappingService::getMappingUrsUserId($userId); if (!$ursUserId) { Log::warning('用户URS映射不存在,无法进行向上传播', ['user_id' => $userId]); return; } $referralChain = UrsReferralService::getReferralChain($ursUserId, 20); if (empty($referralChain)) { Log::info('用户无上级推荐关系,无需向上传播', ['user_id' => $userId]); return; } Log::info('开始URS达人等级延时队列向上传播', [ 'user_id' => $userId, 'total_upstream_levels' => count($referralChain) ]); $immediateUpdateCount = 0; $delayQueueCount = 0; foreach ($referralChain as $level => $ursReferrerId) { $referrerId = UrsUserMapping::getFarmUserIdByUrsUserId($ursReferrerId); if (!$referrerId) { Log::warning('上级用户映射不存在,跳过更新', [ 'level' => $level, 'urs_referrer_id' => $ursReferrerId ]); continue; } if ($level === 1) { // 第1级(直接上级):即时同步处理 $this->updateDirectReferrerTalentLevel($userId, $referrerId, $level); $immediateUpdateCount++; } else { // 第2级及以上:DelayQueue延时处理 $this->addDelayQueueUpdate($userId, $referrerId, $level); $delayQueueCount++; } } Log::info('URS达人等级延时队列向上传播完成', [ 'user_id' => $userId, 'immediate_update_count' => $immediateUpdateCount, 'delay_queue_count' => $delayQueueCount ]); } catch (\Exception $e) { Log::error('URS达人等级延时队列向上传播失败', [ 'user_id' => $userId, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); } } /** * 即时更新直接上级的达人等级 */ private function updateDirectReferrerTalentLevel(int $originalUserId, int $referrerId, int $level): void { try { Log::info('即时更新直接上级达人等级', [ 'original_user_id' => $originalUserId, 'referrer_id' => $referrerId, 'level' => $level ]); $startTime = microtime(true); $result = UrsTalentService::updateTalentLevel($referrerId); $endTime = microtime(true); Log::info('即时更新直接上级达人等级成功', [ 'original_user_id' => $originalUserId, 'referrer_id' => $referrerId, 'level' => $level, 'new_talent_level' => $result->talentLevel, 'execution_time_ms' => round(($endTime - $startTime) * 1000, 2) ]); } catch (\Exception $e) { Log::error('即时更新直接上级达人等级失败', [ 'original_user_id' => $originalUserId, 'referrer_id' => $referrerId, 'level' => $level, 'error' => $e->getMessage() ]); } } /** * 添加DelayQueue延时更新任务 */ private function addDelayQueueUpdate(int $originalUserId, int $referrerId, int $level): void { try { // 计算延时时间 $delaySeconds = min(($level - 1) * 5, 60); // 准备回调参数 $callback = [UrsTalentUpstreamUpdateService::class, 'updateTalentLevel']; $runParam = [ 'referrer_id' => $referrerId, 'original_user_id' => $originalUserId, 'level' => $level, 'trigger_time' => time() ]; Log::info('添加DelayQueue延时更新任务', [ 'original_user_id' => $originalUserId, 'referrer_id' => $referrerId, 'level' => $level, 'delay_seconds' => $delaySeconds, 'callback_class' => $callback[0], 'callback_method' => $callback[1] ]); // 添加到DelayQueue $result = \App\Module\DelayQueue\Redis::addQueue($callback, $runParam, $delaySeconds); if ($result === 0) { Log::info('DelayQueue任务已存在,跳过重复添加', [ 'referrer_id' => $referrerId, 'level' => $level ]); } else { Log::info('DelayQueue任务添加成功', [ 'referrer_id' => $referrerId, 'level' => $level, 'delay_seconds' => $delaySeconds ]); } } catch (\Exception $e) { Log::error('添加DelayQueue延时更新任务失败', [ 'original_user_id' => $originalUserId, 'referrer_id' => $referrerId, 'level' => $level, 'error' => $e->getMessage() ]); } } ``` ### 3. UrsTalentUpstreamUpdateService设计 #### 服务类实现 ```php $runParam ]); return false; } Log::info('DelayQueue开始更新上级达人等级', [ 'referrer_id' => $referrerId, 'original_user_id' => $originalUserId, 'level' => $level, 'trigger_time' => $triggerTime, 'delay_duration' => $triggerTime ? (time() - $triggerTime) : null ]); // 执行达人等级更新(无需防重复检查,因为操作是幂等的) $startTime = microtime(true); $result = UrsTalentService::updateTalentLevel($referrerId); $endTime = microtime(true); Log::info('DelayQueue更新上级达人等级成功', [ 'referrer_id' => $referrerId, 'original_user_id' => $originalUserId, 'level' => $level, 'talent_level' => $result->talentLevel, 'direct_count' => $result->directCount, 'promotion_count' => $result->promotionCount, 'execution_time_ms' => round(($endTime - $startTime) * 1000, 2) ]); return true; } catch (\Exception $e) { Log::error('DelayQueue更新上级达人等级失败', [ 'referrer_id' => $referrerId ?? null, 'original_user_id' => $originalUserId ?? null, 'level' => $level ?? null, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); // 返回false,让DelayQueue知道任务失败 return false; } } /** * 批量更新多个用户的达人等级 * * @param array $userIds 用户ID数组 * @return array 更新结果 */ public static function batchUpdateTalentLevels(array $userIds): array { $results = []; foreach ($userIds as $userId) { try { $result = UrsTalentService::updateTalentLevel($userId); $results[$userId] = [ 'success' => true, 'talent_level' => $result->talentLevel, 'direct_count' => $result->directCount, 'promotion_count' => $result->promotionCount ]; } catch (\Exception $e) { $results[$userId] = [ 'success' => false, 'error' => $e->getMessage() ]; } } Log::info('批量更新达人等级完成', [ 'total_users' => count($userIds), 'success_count' => count(array_filter($results, fn($r) => $r['success'])), 'failed_count' => count(array_filter($results, fn($r) => !$r['success'])) ]); return $results; } } ``` ### 4. DelayQueue防重复机制 #### DelayQueue内置防重复(充分且必要) DelayQueue模块本身具有防重复机制,对于幂等操作来说已经足够: ```php // DelayQueue\Redis::addQueue方法中的防重复逻辑 $key = self::E_KEY . $callback[0] . $callback[1]; // 生成唯一key if ($a->exists($key)) { return 0; // 如果key已存在,返回0表示跳过 } $a->set($key, 1, $delay); // 设置Redis key,过期时间为延时时间 ``` #### 业务操作幂等性分析 URS达人等级更新操作具有天然的幂等性: ```php // 每次都重新计算,不是累加操作 $stats = UrsReferralService::getReferralStats($ursUserId); $newLevel = self::calculateTalentLevel($stats['direct_count'], $stats['total_team_count']); // 直接覆盖更新,不是递增操作 $talent->update([ 'talent_level' => $newLevel, // 直接设置 'direct_count' => $stats['direct_count'], // 直接设置 'promotion_count' => $stats['total_team_count'] // 直接设置 ]); ``` #### 无需额外防重复检查 **原因:** 1. **DelayQueue防重复**:已防止重复任务添加 2. **操作幂等性**:重复执行不会产生错误数据 3. **简化代码**:减少不必要的复杂性 4. **提高性能**:避免额外的Redis锁开销 ### 5. 监控和日志体系 #### 分层日志记录 ```php // 1. 任务分发阶段 Log::info('添加DelayQueue延时更新任务', [ 'original_user_id' => $originalUserId, 'referrer_id' => $referrerId, 'level' => $level, 'delay_seconds' => $delaySeconds, 'callback_class' => 'UrsTalentUpstreamUpdateService', 'callback_method' => 'updateTalentLevel' ]); // 2. 任务执行阶段 Log::info('DelayQueue开始更新上级达人等级', [ 'referrer_id' => $referrerId, 'original_user_id' => $originalUserId, 'level' => $level, 'trigger_time' => $triggerTime, 'delay_duration' => time() - $triggerTime ]); // 3. 任务完成阶段 Log::info('DelayQueue更新上级达人等级成功', [ 'referrer_id' => $referrerId, 'level' => $level, 'new_talent_level' => $result->talentLevel, 'execution_time_ms' => round(($endTime - $startTime) * 1000, 2) ]); ``` #### 性能监控指标 ```php // 执行时间监控 $startTime = microtime(true); $result = UrsTalentService::updateTalentLevel($referrerId); $endTime = microtime(true); // 延时精度监控 $actualDelay = time() - $triggerTime; $expectedDelay = $delaySeconds; $delayAccuracy = abs($actualDelay - $expectedDelay); Log::info('DelayQueue性能统计', [ 'referrer_id' => $referrerId, 'level' => $level, 'execution_time_ms' => round(($endTime - $startTime) * 1000, 2), 'expected_delay' => $expectedDelay, 'actual_delay' => $actualDelay, 'delay_accuracy' => $delayAccuracy ]); ``` #### 错误监控和告警 ```php // 错误分类记录 Log::error('DelayQueue更新失败', [ 'error_type' => 'talent_update_failed', 'referrer_id' => $referrerId, 'level' => $level, 'error_message' => $e->getMessage(), 'error_code' => $e->getCode(), 'retry_count' => 0 // DelayQueue不自动重试,需要手动处理 ]); // 关键指标监控 Log::info('DelayQueue统计指标', [ 'total_tasks_added' => $delayQueueCount, 'immediate_updates' => $immediateUpdateCount, 'success_rate' => $successCount / $totalCount, 'average_execution_time' => $avgExecutionTime ]); ``` ## 方案优势 ### 1. 性能优势 - **分散负载**:通过延时处理,避免瞬时高并发 - **减少数据库压力**:延时合并相同用户的多次更新 - **系统稳定性**:避免雪崩效应 ### 2. 业务优势 - **数据一致性**:确保上级用户等级最终一致 - **实时性平衡**:直接上级即时更新,保证重要数据实时性 - **可扩展性**:支持20级推荐关系的大规模处理 ### 3. 技术优势 - **防重复机制**:Redis锁和队列去重双重保障 - **错误恢复**:队列重试机制保证可靠性 - **监控完善**:详细的日志和性能监控 ## 详细实施计划 ### 第一阶段:核心功能实现(预计2小时) #### 1.1 创建UrsTalentUpstreamUpdateService(30分钟) - **文件位置**:`app/Module/UrsPromotion/Services/UrsTalentUpstreamUpdateService.php` - **核心方法**:`updateTalentLevel(array $runParam): bool` - **功能特点**: - DelayQueue回调接口实现 - Redis锁防重复机制 - 详细的日志记录 - 性能监控统计 #### 1.2 修改UrsTalentLevelUpListener(45分钟) - **文件位置**:`app/Module/UrsPromotion/Listeners/UrsTalentLevelUpListener.php` - **修改内容**: - 添加`updateUpstreamTalentLevels`方法 - 添加`updateDirectReferrerTalentLevel`方法 - 添加`addDelayQueueUpdate`方法 - 集成DelayQueue调用逻辑 #### 1.3 修改UrsReferralCreatedListener(30分钟) - **文件位置**:`app/Module/UrsPromotion/Listeners/UrsReferralCreatedListener.php` - **修改内容**: - 复用UrsTalentLevelUpListener的向上传播逻辑 - 或者提取公共trait/service #### 1.4 基础测试(15分钟) - 单元测试:测试DelayQueue回调方法 - 集成测试:测试完整的事件流程 ### 第二阶段:DelayQueue配置和优化(预计1小时) #### 2.1 DelayQueue运行环境检查(20分钟) - **检查命令**:确认`app-delayqueue:run`命令正常运行 - **Redis配置**:验证Redis连接和key过期机制 - **队列处理**:确认Laravel队列worker正常工作 #### 2.2 延时时间调优(20分钟) - **测试不同延时时间**:5秒、10秒、15秒等 - **性能测试**:模拟大量用户同时升级 - **调整延时算法**:根据测试结果优化延时计算 #### 2.3 监控配置(20分钟) - **日志配置**:确保日志正确输出 - **性能指标**:添加关键性能监控 - **错误告警**:配置错误通知机制 ### 第三阶段:全面测试和验证(预计1.5小时) #### 3.1 功能测试(45分钟) - **单用户升级测试**:验证向上传播逻辑 - **多用户并发测试**:验证防重复机制 - **边界情况测试**:无上级、映射不存在等 #### 3.2 性能测试(30分钟) - **压力测试**:模拟100个用户同时升级 - **延时精度测试**:验证DelayQueue延时准确性 - **资源消耗测试**:监控CPU、内存、Redis使用 #### 3.3 数据一致性验证(15分钟) - **最终一致性**:验证所有上级用户最终都被更新 - **数据准确性**:验证达人等级计算正确 - **日志完整性**:验证所有操作都有日志记录 ### 第四阶段:部署和监控(预计30分钟) #### 4.1 生产环境部署(15分钟) - **代码部署**:发布新版本代码 - **配置检查**:验证生产环境DelayQueue配置 - **服务重启**:重启相关服务 #### 4.2 上线监控(15分钟) - **实时监控**:观察DelayQueue任务执行情况 - **错误监控**:关注错误日志和异常 - **性能监控**:监控系统性能指标 ## 技术优势总结 ### 1. DelayQueue模块优势 - **内置防重复**:Redis key机制自动防止重复任务 - **灵活回调**:支持任意类方法回调,扩展性强 - **项目集成**:与现有架构无缝集成 - **稳定可靠**:基于Laravel队列系统,经过验证 ### 2. 业务优势 - **分散负载**:延时处理避免瞬时高并发 - **数据一致性**:最终一致性保证,直接上级实时更新 - **系统稳定**:避免雪崩效应,提升系统稳定性 - **可扩展性**:支持20级推荐关系的大规模处理 ### 3. 运维优势 - **监控完善**:详细的日志和性能监控 - **故障恢复**:DelayQueue失败不影响主流程 - **调试友好**:清晰的日志便于问题排查 - **配置灵活**:延时时间可根据需要调整 ## 风险控制 ### 1. 技术风险 - **DelayQueue依赖**:确保DelayQueue服务正常运行 - **Redis稳定性**:Redis故障可能影响防重复机制 - **队列积压**:大量任务可能导致队列积压 ### 2. 缓解措施 - **监控告警**:实时监控DelayQueue运行状态 - **降级方案**:DelayQueue失败时记录日志,后续补偿 - **容量规划**:合理配置队列worker数量 - **备份机制**:重要操作记录到数据库,便于恢复