当某个用户的上级更新了达人指标时,这个更新没有向上传播到更高层级的上级(上上级、上上上级等),导致达人等级统计不准确。
UrsReferralCreatedListener:只更新直接推荐人,没有向上传播UrsTalentLevelUpListener:只记录日志,没有更新上级用户// 使用delay()方法
SomeJob::dispatch($data)->delay(60); // 延迟60秒
SomeJob::dispatch($data)->delay(now()->addMinutes(5)); // 延迟5分钟
// Transfer模块示例
ProcessTransferOrderJob::dispatch($order)
->delay(now()->addSeconds(5)); // 延迟5秒处理
// 使用Redis延时队列
Redis::addQueue($callback, $runParam, $delay = 3);
// DQueueJob示例
static protected function jobUpdate($parameter)
{
Redis::addQueue([static::class, 'updateSync'], $parameter, static::getDelay());
}
// QueueJob延时处理
self::dispatch($parameter)->delay(2)->onQueue($queue);
$delaySeconds = match($level) {
1 => 0, // 直接上级:即时处理
2 => 5, // 上上级:延时5秒
3 => 10, // 第3级:延时10秒
4 => 15, // 第4级:延时15秒
default => min(($level - 1) * 5, 60) // 最大延时60秒
};
// 在UrsTalentLevelUpListener中
private function updateUpstreamTalentLevels(int $userId): void
{
try {
$ursUserId = UrsUserMappingService::getMappingUrsUserId($userId);
if (!$ursUserId) {
Log::warning('用户URS映射不存在,无法进行向上传播', ['user_id' => $userId]);
return;
}
$referralChain = UrsReferralService::getReferralChain($ursUserId, 20);
if (empty($referralChain)) {
Log::info('用户无上级推荐关系,无需向上传播', ['user_id' => $userId]);
return;
}
Log::info('开始URS达人等级延时队列向上传播', [
'user_id' => $userId,
'total_upstream_levels' => count($referralChain)
]);
$immediateUpdateCount = 0;
$delayQueueCount = 0;
foreach ($referralChain as $level => $ursReferrerId) {
$referrerId = UrsUserMapping::getFarmUserIdByUrsUserId($ursReferrerId);
if (!$referrerId) {
Log::warning('上级用户映射不存在,跳过更新', [
'level' => $level,
'urs_referrer_id' => $ursReferrerId
]);
continue;
}
if ($level === 1) {
// 第1级(直接上级):即时同步处理
$this->updateDirectReferrerTalentLevel($userId, $referrerId, $level);
$immediateUpdateCount++;
} else {
// 第2级及以上:DelayQueue延时处理
$this->addDelayQueueUpdate($userId, $referrerId, $level);
$delayQueueCount++;
}
}
Log::info('URS达人等级延时队列向上传播完成', [
'user_id' => $userId,
'immediate_update_count' => $immediateUpdateCount,
'delay_queue_count' => $delayQueueCount
]);
} catch (\Exception $e) {
Log::error('URS达人等级延时队列向上传播失败', [
'user_id' => $userId,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
}
}
/**
* 即时更新直接上级的达人等级
*/
private function updateDirectReferrerTalentLevel(int $originalUserId, int $referrerId, int $level): void
{
try {
Log::info('即时更新直接上级达人等级', [
'original_user_id' => $originalUserId,
'referrer_id' => $referrerId,
'level' => $level
]);
$startTime = microtime(true);
$result = UrsTalentService::updateTalentLevel($referrerId);
$endTime = microtime(true);
Log::info('即时更新直接上级达人等级成功', [
'original_user_id' => $originalUserId,
'referrer_id' => $referrerId,
'level' => $level,
'new_talent_level' => $result->talentLevel,
'execution_time_ms' => round(($endTime - $startTime) * 1000, 2)
]);
} catch (\Exception $e) {
Log::error('即时更新直接上级达人等级失败', [
'original_user_id' => $originalUserId,
'referrer_id' => $referrerId,
'level' => $level,
'error' => $e->getMessage()
]);
}
}
/**
* 添加DelayQueue延时更新任务
*/
private function addDelayQueueUpdate(int $originalUserId, int $referrerId, int $level): void
{
try {
// 计算延时时间
$delaySeconds = min(($level - 1) * 5, 60);
// 准备回调参数
$callback = [UrsTalentUpstreamUpdateService::class, 'updateTalentLevel'];
$runParam = [
'referrer_id' => $referrerId,
'original_user_id' => $originalUserId,
'level' => $level,
'trigger_time' => time()
];
Log::info('添加DelayQueue延时更新任务', [
'original_user_id' => $originalUserId,
'referrer_id' => $referrerId,
'level' => $level,
'delay_seconds' => $delaySeconds,
'callback_class' => $callback[0],
'callback_method' => $callback[1]
]);
// 添加到DelayQueue
$result = \App\Module\DelayQueue\Redis::addQueue($callback, $runParam, $delaySeconds);
if ($result === 0) {
Log::info('DelayQueue任务已存在,跳过重复添加', [
'referrer_id' => $referrerId,
'level' => $level
]);
} else {
Log::info('DelayQueue任务添加成功', [
'referrer_id' => $referrerId,
'level' => $level,
'delay_seconds' => $delaySeconds
]);
}
} catch (\Exception $e) {
Log::error('添加DelayQueue延时更新任务失败', [
'original_user_id' => $originalUserId,
'referrer_id' => $referrerId,
'level' => $level,
'error' => $e->getMessage()
]);
}
}
<?php
namespace App\Module\UrsPromotion\Services;
use App\Module\UrsPromotion\Services\UrsTalentService;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Cache;
/**
* URS达人等级上级更新服务
*
* 专门处理DelayQueue延时队列的回调更新逻辑
*/
class UrsTalentUpstreamUpdateService
{
/**
* DelayQueue回调方法:更新达人等级
*
* 此方法被DelayQueue模块调用,用于延时更新上级用户的达人等级
*
* @param array $runParam 运行参数
* @return bool 更新结果
*/
public static function updateTalentLevel(array $runParam): bool
{
try {
// 提取参数
$referrerId = $runParam['referrer_id'] ?? null;
$originalUserId = $runParam['original_user_id'] ?? null;
$level = $runParam['level'] ?? null;
$triggerTime = $runParam['trigger_time'] ?? null;
// 参数验证
if (!$referrerId || !$originalUserId || !$level) {
Log::error('DelayQueue回调参数不完整', [
'run_param' => $runParam
]);
return false;
}
Log::info('DelayQueue开始更新上级达人等级', [
'referrer_id' => $referrerId,
'original_user_id' => $originalUserId,
'level' => $level,
'trigger_time' => $triggerTime,
'delay_duration' => $triggerTime ? (time() - $triggerTime) : null
]);
// 执行达人等级更新(无需防重复检查,因为操作是幂等的)
$startTime = microtime(true);
$result = UrsTalentService::updateTalentLevel($referrerId);
$endTime = microtime(true);
Log::info('DelayQueue更新上级达人等级成功', [
'referrer_id' => $referrerId,
'original_user_id' => $originalUserId,
'level' => $level,
'talent_level' => $result->talentLevel,
'direct_count' => $result->directCount,
'promotion_count' => $result->promotionCount,
'execution_time_ms' => round(($endTime - $startTime) * 1000, 2)
]);
return true;
} catch (\Exception $e) {
Log::error('DelayQueue更新上级达人等级失败', [
'referrer_id' => $referrerId ?? null,
'original_user_id' => $originalUserId ?? null,
'level' => $level ?? null,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
// 返回false,让DelayQueue知道任务失败
return false;
}
}
/**
* 批量更新多个用户的达人等级
*
* @param array $userIds 用户ID数组
* @return array 更新结果
*/
public static function batchUpdateTalentLevels(array $userIds): array
{
$results = [];
foreach ($userIds as $userId) {
try {
$result = UrsTalentService::updateTalentLevel($userId);
$results[$userId] = [
'success' => true,
'talent_level' => $result->talentLevel,
'direct_count' => $result->directCount,
'promotion_count' => $result->promotionCount
];
} catch (\Exception $e) {
$results[$userId] = [
'success' => false,
'error' => $e->getMessage()
];
}
}
Log::info('批量更新达人等级完成', [
'total_users' => count($userIds),
'success_count' => count(array_filter($results, fn($r) => $r['success'])),
'failed_count' => count(array_filter($results, fn($r) => !$r['success']))
]);
return $results;
}
}
DelayQueue模块本身具有防重复机制,对于幂等操作来说已经足够:
// DelayQueue\Redis::addQueue方法中的防重复逻辑
$key = self::E_KEY . $callback[0] . $callback[1]; // 生成唯一key
if ($a->exists($key)) {
return 0; // 如果key已存在,返回0表示跳过
}
$a->set($key, 1, $delay); // 设置Redis key,过期时间为延时时间
URS达人等级更新操作具有天然的幂等性:
// 每次都重新计算,不是累加操作
$stats = UrsReferralService::getReferralStats($ursUserId);
$newLevel = self::calculateTalentLevel($stats['direct_count'], $stats['total_team_count']);
// 直接覆盖更新,不是递增操作
$talent->update([
'talent_level' => $newLevel, // 直接设置
'direct_count' => $stats['direct_count'], // 直接设置
'promotion_count' => $stats['total_team_count'] // 直接设置
]);
原因:
// 1. 任务分发阶段
Log::info('添加DelayQueue延时更新任务', [
'original_user_id' => $originalUserId,
'referrer_id' => $referrerId,
'level' => $level,
'delay_seconds' => $delaySeconds,
'callback_class' => 'UrsTalentUpstreamUpdateService',
'callback_method' => 'updateTalentLevel'
]);
// 2. 任务执行阶段
Log::info('DelayQueue开始更新上级达人等级', [
'referrer_id' => $referrerId,
'original_user_id' => $originalUserId,
'level' => $level,
'trigger_time' => $triggerTime,
'delay_duration' => time() - $triggerTime
]);
// 3. 任务完成阶段
Log::info('DelayQueue更新上级达人等级成功', [
'referrer_id' => $referrerId,
'level' => $level,
'new_talent_level' => $result->talentLevel,
'execution_time_ms' => round(($endTime - $startTime) * 1000, 2)
]);
// 执行时间监控
$startTime = microtime(true);
$result = UrsTalentService::updateTalentLevel($referrerId);
$endTime = microtime(true);
// 延时精度监控
$actualDelay = time() - $triggerTime;
$expectedDelay = $delaySeconds;
$delayAccuracy = abs($actualDelay - $expectedDelay);
Log::info('DelayQueue性能统计', [
'referrer_id' => $referrerId,
'level' => $level,
'execution_time_ms' => round(($endTime - $startTime) * 1000, 2),
'expected_delay' => $expectedDelay,
'actual_delay' => $actualDelay,
'delay_accuracy' => $delayAccuracy
]);
// 错误分类记录
Log::error('DelayQueue更新失败', [
'error_type' => 'talent_update_failed',
'referrer_id' => $referrerId,
'level' => $level,
'error_message' => $e->getMessage(),
'error_code' => $e->getCode(),
'retry_count' => 0 // DelayQueue不自动重试,需要手动处理
]);
// 关键指标监控
Log::info('DelayQueue统计指标', [
'total_tasks_added' => $delayQueueCount,
'immediate_updates' => $immediateUpdateCount,
'success_rate' => $successCount / $totalCount,
'average_execution_time' => $avgExecutionTime
]);
app/Module/UrsPromotion/Services/UrsTalentUpstreamUpdateService.phpupdateTalentLevel(array $runParam): boolapp/Module/UrsPromotion/Listeners/UrsTalentLevelUpListener.phpupdateUpstreamTalentLevels方法updateDirectReferrerTalentLevel方法addDelayQueueUpdate方法app/Module/UrsPromotion/Listeners/UrsReferralCreatedListener.phpapp-delayqueue:run命令正常运行