DelayQueue Job 模块包含延迟队列的任务执行类,负责实际执行延迟任务的业务逻辑。
Job.php)延迟队列的核心任务执行类,继承自 UCore\Queue\QueueJob。
namespace App\Module\DelayQueue\Job;
use App\Module\DelayQueue\Entity\Queue;
use UCore\Queue\QueueJob;
class Job extends QueueJob
{
public function __construct(public Queue $arg) {}
public function run(): bool
{
// 执行回调方法
$res = call_user_func(
[$this->arg->runClass, $this->arg->runMethod],
$this->arg->runParam
);
return true;
}
public function payload()
{
return $this->arg;
}
}
call_user_func 执行指定的类方法dump 输出用于调试graph TB
A[Laravel 队列调度] --> B[Job 构造函数]
B --> C[接收 Queue 实体]
C --> D[run 方法执行]
D --> E[call_user_func 回调]
E --> F[业务逻辑执行]
F --> G[返回执行结果]
G --> H[任务完成]
use App\Module\DelayQueue\Entity\Queue;
use App\Module\DelayQueue\Job\Job;
// 创建 Queue 实体
$queue = new Queue();
$queue->create_ts = time();
$queue->delay_ts = 10;
$queue->runClass = SomeService::class;
$queue->runMethod = 'someMethod';
$queue->runParam = ['param1' => 'value1'];
// 创建并分发任务
$job = new Job($queue);
Job::dispatch($queue)->delay(10);
class ExampleService
{
/**
* 延迟队列回调方法
*
* @param array $runParam 运行参数
* @return mixed
*/
public static function processDelayedTask(array $runParam)
{
$userId = $runParam['user_id'];
$action = $runParam['action'];
// 执行业务逻辑
switch ($action) {
case 'update_stats':
return self::updateUserStats($userId);
case 'send_notification':
return self::sendNotification($userId, $runParam['message']);
default:
throw new \InvalidArgumentException("未知操作: {$action}");
}
}
private static function updateUserStats($userId)
{
// 更新用户统计
return true;
}
private static function sendNotification($userId, $message)
{
// 发送通知
return true;
}
}
Job 类继承了 UCore\Queue\QueueJob 的错误处理机制:
// 基类会自动处理以下情况:
// 1. 任务执行异常
// 2. 重试机制
// 3. 失败任务记录
// 4. 日志记录
class CustomJob extends Job
{
public function run(): bool
{
try {
$result = call_user_func(
[$this->arg->runClass, $this->arg->runMethod],
$this->arg->runParam
);
// 记录成功日志
$this->logInfo('延迟任务执行成功', [
'class' => $this->arg->runClass,
'method' => $this->arg->runMethod,
'result' => $result
]);
return true;
} catch (\Exception $e) {
// 记录错误日志
$this->logError('延迟任务执行失败', [
'class' => $this->arg->runClass,
'method' => $this->arg->runMethod,
'error' => $e->getMessage(),
'params' => $this->arg->runParam
]);
throw $e; // 重新抛出异常,触发重试机制
}
}
}
当前 Job 类包含调试输出:
public function run(): bool
{
// 调试输出任务信息
dump($this->arg, [$this->arg->runClass, $this->arg->runMethod]);
$res = call_user_func([$this->arg->runClass, $this->arg->runMethod], $this->arg->runParam);
// 调试输出执行结果
dump($res);
return true;
}
注意: 生产环境建议移除或替换为日志记录。
// 添加性能监控
public function run(): bool
{
$startTime = microtime(true);
try {
$result = call_user_func(
[$this->arg->runClass, $this->arg->runMethod],
$this->arg->runParam
);
$executionTime = microtime(true) - $startTime;
// 记录性能指标
$this->logInfo('任务执行完成', [
'execution_time' => $executionTime,
'memory_usage' => memory_get_usage(true),
'class' => $this->arg->runClass,
'method' => $this->arg->runMethod
]);
return true;
} catch (\Exception $e) {
$executionTime = microtime(true) - $startTime;
$this->logError('任务执行失败', [
'execution_time' => $executionTime,
'error' => $e->getMessage(),
'class' => $this->arg->runClass,
'method' => $this->arg->runMethod
]);
throw $e;
}
}
// ✅ 推荐:静态方法,清晰的参数
public static function processUser(array $params)
{
$userId = $params['user_id'];
$action = $params['action'];
// 处理逻辑
}
// ❌ 不推荐:实例方法,复杂的依赖
public function processUser($userId, $action, $dependency1, $dependency2)
{
// 处理逻辑
}
// ✅ 推荐:结构化参数
$runParam = [
'user_id' => 123,
'action' => 'update_stats',
'timestamp' => time(),
'metadata' => ['source' => 'api']
];
// ❌ 不推荐:简单值或复杂对象
$runParam = 123; // 太简单
$runParam = $userObject; // 可能序列化问题
public static function robustMethod(array $params)
{
try {
return self::primaryLogic($params);
} catch (\Exception $e) {
// 记录错误但不中断
Log::warning('主逻辑失败,尝试备用方案', [
'error' => $e->getMessage(),
'params' => $params
]);
return self::fallbackLogic($params);
}
}