orderId = $orderId; $this->action = $action; parent::__construct([ 'order_id' => $orderId, 'action' => $action ]); // 设置队列名称 $this->onQueue('transfer'); } /** * 执行任务 */ public function run(): bool { try { $order = TransferOrder::find($this->orderId); if (!$order) { Log::warning('Transfer order not found for processing', [ 'order_id' => $this->orderId, 'action' => $this->action ]); return false; } Log::info('Processing transfer order', [ 'order_id' => $this->orderId, 'action' => $this->action, 'type' => $order->type->value, 'status' => $order->status->value ]); // 根据动作类型执行不同的处理 match ($this->action) { 'process' => $this->processOrder($order), 'query' => $this->queryOrderStatus($order), 'retry' => $this->retryOrder($order), default => Log::warning('Unknown action for transfer order', [ 'order_id' => $this->orderId, 'action' => $this->action ]) }; } catch (\Exception $e) { Log::error('Transfer order processing failed', [ 'order_id' => $this->orderId, 'action' => $this->action, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); // 重新抛出异常以触发重试机制 throw $e; } return true; } /** * 获取任务数据 * * @return array */ public function payload() { return [ 'order_id' => $this->orderId, 'action' => $this->action ]; } /** * 处理订单 */ private function processOrder(TransferOrder $order): void { // 检查订单状态 if ($order->isFinalStatus()) { Log::info('Transfer order already in final status', [ 'order_id' => $order->id, 'status' => $order->status->value ]); return; } // 根据订单类型处理 if ($order->isTransferOut()) { $this->processTransferOut($order); } else { $this->processTransferIn($order); } } /** * 处理转出订单 */ private function processTransferOut(TransferOrder $order): void { $success = OrderLogic::processTransferOut($order); if ($success) { Log::info('Transfer out order processed successfully', [ 'order_id' => $order->id ]); // 如果需要查询状态,延迟调度查询任务 if ($order->status === TransferStatus::PROCESSING) { self::dispatch($order->id, 'query') ->delay(now()->addMinutes(2)); } } else { Log::error('Transfer out order processing failed', [ 'order_id' => $order->id ]); } } /** * 处理转入订单 */ private function processTransferIn(TransferOrder $order): void { // 转入订单主要是发送回调 if ($order->transferApp->supportsCallback()) { SendCallbackJob::dispatch($order->id); } else { // 没有回调配置,直接完成 $order->updateStatus(TransferStatus::COMPLETED); } } /** * 查询订单状态 */ private function queryOrderStatus(TransferOrder $order): void { if ($order->isTransferOut()) { $success = OrderLogic::queryTransferOutStatus($order); } else { $success = OrderLogic::queryTransferInStatus($order); } if ($success) { Log::info('Transfer order status queried successfully', [ 'order_id' => $order->id, 'new_status' => $order->fresh()->status->value ]); } else { Log::warning('Transfer order status query failed', [ 'order_id' => $order->id ]); // 如果查询失败且订单仍在处理中,安排下次查询 if ($order->status === TransferStatus::PROCESSING) { self::dispatch($order->id, 'query') ->delay(now()->addMinutes(5)); } } } /** * 重试订单 */ private function retryOrder(TransferOrder $order): void { if (!$order->canRetry()) { Log::warning('Transfer order cannot be retried', [ 'order_id' => $order->id, 'status' => $order->status->value ]); return; } // 重置状态并重新处理 $order->updateStatus(TransferStatus::CREATED); $this->processOrder($order); } /** * 任务失败时的处理 */ public function failed(\Throwable $exception): void { Log::error('Transfer order job failed permanently', [ 'order_id' => $this->orderId, 'action' => $this->action, 'error' => $exception->getMessage(), 'attempts' => $this->attempts() ]); // 更新订单状态为失败 $order = TransferOrder::find($this->orderId); if ($order && !$order->isFinalStatus()) { $order->updateStatus(TransferStatus::FAILED, '任务处理失败: ' . $exception->getMessage()); } } /** * 获取任务的唯一ID */ public function uniqueId(): string { return "transfer_order_{$this->orderId}_{$this->action}"; } /** * 静态方法:调度处理订单任务 */ public static function scheduleProcess(int $orderId, int $delayMinutes = 0): void { $job = new self($orderId, 'process'); if ($delayMinutes > 0) { $job->delay(now()->addMinutes($delayMinutes)); } dispatch($job); } /** * 静态方法:调度查询状态任务 */ public static function scheduleQuery(int $orderId, int $delayMinutes = 2): void { self::dispatch($orderId, 'query') ->delay(now()->addMinutes($delayMinutes)); } /** * 静态方法:调度重试任务 */ public static function scheduleRetry(int $orderId, int $delayMinutes = 1): void { self::dispatch($orderId, 'retry') ->delay(now()->addMinutes($delayMinutes)); } }