ProcessTransferOrderJob.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. <?php
  2. namespace App\Module\Transfer\Jobs;
  3. use App\Module\Transfer\Logics\OrderLogic;
  4. use App\Module\Transfer\Models\TransferOrder;
  5. use App\Module\Transfer\Enums\TransferStatus;
  6. use App\Module\Transfer\Enums\TransferType;
  7. use UCore\Queue\QueueJob;
  8. use Illuminate\Support\Facades\Log;
  9. /**
  10. * 处理划转订单任务
  11. */
  12. class ProcessTransferOrderJob extends QueueJob
  13. {
  14. private int $orderId;
  15. private string $action;
  16. /**
  17. * 任务最大尝试次数
  18. */
  19. public int $tries = 3;
  20. /**
  21. * 任务超时时间(秒)
  22. */
  23. public int $timeout = 120;
  24. /**
  25. * 创建任务实例
  26. */
  27. public function __construct(int $orderId, string $action = 'process')
  28. {
  29. $this->orderId = $orderId;
  30. $this->action = $action;
  31. parent::__construct([
  32. 'order_id' => $orderId,
  33. 'action' => $action
  34. ]);
  35. // 设置队列名称
  36. $this->onQueue('transfer');
  37. }
  38. /**
  39. * 执行任务
  40. */
  41. public function run(): bool
  42. {
  43. try {
  44. $order = TransferOrder::find($this->orderId);
  45. if (!$order) {
  46. Log::warning('Transfer order not found for processing', [
  47. 'order_id' => $this->orderId,
  48. 'action' => $this->action
  49. ]);
  50. return false;
  51. }
  52. Log::info('Processing transfer order', [
  53. 'order_id' => $this->orderId,
  54. 'action' => $this->action,
  55. 'type' => $order->type->value,
  56. 'status' => $order->status->value
  57. ]);
  58. // 根据动作类型执行不同的处理
  59. match ($this->action) {
  60. 'process' => $this->processOrder($order),
  61. 'query' => $this->queryOrderStatus($order),
  62. 'retry' => $this->retryOrder($order),
  63. default => Log::warning('Unknown action for transfer order', [
  64. 'order_id' => $this->orderId,
  65. 'action' => $this->action
  66. ])
  67. };
  68. } catch (\Exception $e) {
  69. Log::error('Transfer order processing failed', [
  70. 'order_id' => $this->orderId,
  71. 'action' => $this->action,
  72. 'error' => $e->getMessage(),
  73. 'trace' => $e->getTraceAsString()
  74. ]);
  75. // 重新抛出异常以触发重试机制
  76. throw $e;
  77. }
  78. return true;
  79. }
  80. /**
  81. * 获取任务数据
  82. *
  83. * @return array
  84. */
  85. public function payload()
  86. {
  87. return [
  88. 'order_id' => $this->orderId,
  89. 'action' => $this->action
  90. ];
  91. }
  92. /**
  93. * 处理订单
  94. */
  95. private function processOrder(TransferOrder $order): void
  96. {
  97. // 检查订单状态
  98. if ($order->isFinalStatus()) {
  99. Log::info('Transfer order already in final status', [
  100. 'order_id' => $order->id,
  101. 'status' => $order->status->value
  102. ]);
  103. return;
  104. }
  105. // 根据订单类型处理
  106. if ($order->isTransferOut()) {
  107. $this->processTransferOut($order);
  108. } else {
  109. $this->processTransferIn($order);
  110. }
  111. }
  112. /**
  113. * 处理转出订单
  114. */
  115. private function processTransferOut(TransferOrder $order): void
  116. {
  117. $success = OrderLogic::processTransferOut($order);
  118. if ($success) {
  119. Log::info('Transfer out order processed successfully', [
  120. 'order_id' => $order->id
  121. ]);
  122. // 如果需要查询状态,延迟调度查询任务
  123. if ($order->status === TransferStatus::PROCESSING) {
  124. self::dispatch($order->id, 'query')
  125. ->delay(now()->addMinutes(2));
  126. }
  127. } else {
  128. Log::error('Transfer out order processing failed', [
  129. 'order_id' => $order->id
  130. ]);
  131. }
  132. }
  133. /**
  134. * 处理转入订单
  135. */
  136. private function processTransferIn(TransferOrder $order): void
  137. {
  138. // 转入订单主要是发送回调
  139. if ($order->transferApp->supportsCallback()) {
  140. SendCallbackJob::dispatch($order->id);
  141. } else {
  142. // 没有回调配置,直接完成
  143. $order->updateStatus(TransferStatus::COMPLETED);
  144. }
  145. }
  146. /**
  147. * 查询订单状态
  148. */
  149. private function queryOrderStatus(TransferOrder $order): void
  150. {
  151. if ($order->isTransferOut()) {
  152. $success = OrderLogic::queryTransferOutStatus($order);
  153. } else {
  154. $success = OrderLogic::queryTransferInStatus($order);
  155. }
  156. if ($success) {
  157. Log::info('Transfer order status queried successfully', [
  158. 'order_id' => $order->id,
  159. 'new_status' => $order->fresh()->status->value
  160. ]);
  161. } else {
  162. Log::warning('Transfer order status query failed', [
  163. 'order_id' => $order->id
  164. ]);
  165. // 如果查询失败且订单仍在处理中,安排下次查询
  166. if ($order->status === TransferStatus::PROCESSING) {
  167. self::dispatch($order->id, 'query')
  168. ->delay(now()->addMinutes(5));
  169. }
  170. }
  171. }
  172. /**
  173. * 重试订单
  174. */
  175. private function retryOrder(TransferOrder $order): void
  176. {
  177. if (!$order->canRetry()) {
  178. Log::warning('Transfer order cannot be retried', [
  179. 'order_id' => $order->id,
  180. 'status' => $order->status->value
  181. ]);
  182. return;
  183. }
  184. // 重置状态并重新处理
  185. $order->updateStatus(TransferStatus::CREATED);
  186. $this->processOrder($order);
  187. }
  188. /**
  189. * 任务失败时的处理
  190. */
  191. public function failed(\Throwable $exception): void
  192. {
  193. Log::error('Transfer order job failed permanently', [
  194. 'order_id' => $this->orderId,
  195. 'action' => $this->action,
  196. 'error' => $exception->getMessage(),
  197. 'attempts' => $this->attempts()
  198. ]);
  199. // 更新订单状态为失败
  200. $order = TransferOrder::find($this->orderId);
  201. if ($order && !$order->isFinalStatus()) {
  202. $order->updateStatus(TransferStatus::FAILED, '任务处理失败: ' . $exception->getMessage());
  203. }
  204. }
  205. /**
  206. * 获取任务的唯一ID
  207. */
  208. public function uniqueId(): string
  209. {
  210. return "transfer_order_{$this->orderId}_{$this->action}";
  211. }
  212. /**
  213. * 静态方法:调度处理订单任务
  214. */
  215. public static function scheduleProcess(int $orderId, int $delayMinutes = 0): void
  216. {
  217. $job = new self($orderId, 'process');
  218. if ($delayMinutes > 0) {
  219. $job->delay(now()->addMinutes($delayMinutes));
  220. }
  221. dispatch($job);
  222. }
  223. /**
  224. * 静态方法:调度查询状态任务
  225. */
  226. public static function scheduleQuery(int $orderId, int $delayMinutes = 2): void
  227. {
  228. self::dispatch($orderId, 'query')
  229. ->delay(now()->addMinutes($delayMinutes));
  230. }
  231. /**
  232. * 静态方法:调度重试任务
  233. */
  234. public static function scheduleRetry(int $orderId, int $delayMinutes = 1): void
  235. {
  236. self::dispatch($orderId, 'retry')
  237. ->delay(now()->addMinutes($delayMinutes));
  238. }
  239. }