SendCallbackJob.php 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. <?php
  2. namespace App\Module\Transfer\Jobs;
  3. use App\Module\Transfer\Logics\CallbackLogic;
  4. use App\Module\Transfer\Models\TransferOrder;
  5. use App\Module\Transfer\Enums\TransferStatus;
  6. use UCore\Queue\QueueJob;
  7. use Illuminate\Support\Facades\Log;
  8. /**
  9. * 发送回调通知任务
  10. */
  11. class SendCallbackJob extends QueueJob
  12. {
  13. private int $orderId;
  14. private int $retryCount;
  15. private int $maxRetries;
  16. /**
  17. * 任务最大尝试次数
  18. */
  19. public int $tries = 1; // 重试逻辑由业务控制
  20. /**
  21. * 任务超时时间(秒)
  22. */
  23. public int $timeout = 60;
  24. /**
  25. * 创建任务实例
  26. */
  27. public function __construct(int $orderId, int $retryCount = 0, int $maxRetries = 3)
  28. {
  29. $this->orderId = $orderId;
  30. $this->retryCount = $retryCount;
  31. $this->maxRetries = $maxRetries;
  32. parent::__construct([
  33. 'order_id' => $orderId,
  34. 'retry_count' => $retryCount,
  35. 'max_retries' => $maxRetries
  36. ]);
  37. // 设置队列名称
  38. $this->onQueue('transfer_callback');
  39. }
  40. /**
  41. * 执行任务
  42. */
  43. public function run(): bool
  44. {
  45. try {
  46. $order = TransferOrder::find($this->orderId);
  47. if (!$order) {
  48. Log::warning('Transfer order not found for callback', [
  49. 'order_id' => $this->orderId
  50. ]);
  51. return false;
  52. }
  53. // 检查订单状态
  54. if ($order->isFinalStatus()) {
  55. Log::info('Transfer order already in final status, skipping callback', [
  56. 'order_id' => $order->id,
  57. 'status' => $order->status->value
  58. ]);
  59. return true;
  60. }
  61. // 检查是否支持回调
  62. if (!$order->transferApp->supportsCallback()) {
  63. Log::info('Transfer app does not support callback, completing order', [
  64. 'order_id' => $order->id,
  65. 'app_id' => $order->transfer_app_id
  66. ]);
  67. $order->updateStatus(TransferStatus::COMPLETED);
  68. return true;
  69. }
  70. Log::info('Sending transfer callback', [
  71. 'order_id' => $order->id,
  72. 'retry_count' => $this->retryCount,
  73. 'callback_url' => $order->transferApp->order_callback_url
  74. ]);
  75. // 发送回调
  76. $success = CallbackLogic::sendCallback($order);
  77. if ($success) {
  78. Log::info('Transfer callback sent successfully', [
  79. 'order_id' => $order->id,
  80. 'retry_count' => $this->retryCount
  81. ]);
  82. } else {
  83. $this->handleCallbackFailure($order);
  84. }
  85. } catch (\Exception $e) {
  86. Log::error('Transfer callback job failed', [
  87. 'order_id' => $this->orderId,
  88. 'retry_count' => $this->retryCount,
  89. 'error' => $e->getMessage(),
  90. 'trace' => $e->getTraceAsString()
  91. ]);
  92. // 处理回调失败
  93. $order = TransferOrder::find($this->orderId);
  94. if ($order) {
  95. $this->handleCallbackFailure($order);
  96. }
  97. return false;
  98. }
  99. return true;
  100. }
  101. /**
  102. * 获取任务数据
  103. *
  104. * @return array
  105. */
  106. public function payload()
  107. {
  108. return [
  109. 'order_id' => $this->orderId,
  110. 'retry_count' => $this->retryCount,
  111. 'max_retries' => $this->maxRetries
  112. ];
  113. }
  114. /**
  115. * 处理回调失败
  116. */
  117. private function handleCallbackFailure(TransferOrder $order): void
  118. {
  119. if ($this->retryCount < $this->maxRetries) {
  120. // 安排重试
  121. $nextRetryCount = $this->retryCount + 1;
  122. $delayMinutes = $this->calculateRetryDelay($nextRetryCount);
  123. Log::info('Scheduling callback retry', [
  124. 'order_id' => $order->id,
  125. 'retry_count' => $nextRetryCount,
  126. 'delay_minutes' => $delayMinutes
  127. ]);
  128. // 更新订单的重试计数
  129. $callbackData = $order->callback_data;
  130. $callbackData['retry_count'] = $nextRetryCount;
  131. $callbackData['last_retry_at'] = now()->toDateTimeString();
  132. $order->update(['callback_data' => $callbackData]);
  133. // 调度重试任务
  134. self::dispatch($order->id, $nextRetryCount, $this->maxRetries)
  135. ->delay(now()->addMinutes($delayMinutes));
  136. } else {
  137. // 达到最大重试次数,记录失败
  138. Log::error('Transfer callback max retries reached', [
  139. 'order_id' => $order->id,
  140. 'retry_count' => $this->retryCount,
  141. 'max_retries' => $this->maxRetries
  142. ]);
  143. // 更新订单状态为失败
  144. $order->updateStatus(TransferStatus::FAILED, '回调发送失败,已达最大重试次数');
  145. }
  146. }
  147. /**
  148. * 计算重试延迟时间(指数退避)
  149. */
  150. private function calculateRetryDelay(int $retryCount): int
  151. {
  152. // 指数退避:1分钟、2分钟、4分钟
  153. return min(pow(2, $retryCount - 1), 30); // 最大30分钟
  154. }
  155. /**
  156. * 任务失败时的处理
  157. */
  158. public function failed(\Throwable $exception): void
  159. {
  160. Log::error('Transfer callback job failed permanently', [
  161. 'order_id' => $this->orderId,
  162. 'retry_count' => $this->retryCount,
  163. 'error' => $exception->getMessage()
  164. ]);
  165. // 更新订单状态为失败
  166. $order = TransferOrder::find($this->orderId);
  167. if ($order && !$order->isFinalStatus()) {
  168. $order->updateStatus(TransferStatus::FAILED, '回调任务失败: ' . $exception->getMessage());
  169. }
  170. }
  171. /**
  172. * 获取任务的唯一ID
  173. */
  174. public function uniqueId(): string
  175. {
  176. return "transfer_callback_{$this->orderId}_{$this->retryCount}";
  177. }
  178. /**
  179. * 静态方法:调度回调任务
  180. */
  181. public static function schedule(int $orderId, int $delayMinutes = 0): void
  182. {
  183. $job = new self($orderId);
  184. if ($delayMinutes > 0) {
  185. $job->delay(now()->addMinutes($delayMinutes));
  186. }
  187. dispatch($job);
  188. }
  189. /**
  190. * 静态方法:立即发送回调
  191. */
  192. public static function sendNow(int $orderId): void
  193. {
  194. dispatch_sync(new self($orderId));
  195. }
  196. /**
  197. * 静态方法:批量调度回调任务
  198. */
  199. public static function scheduleBatch(array $orderIds, int $delayMinutes = 0): void
  200. {
  201. foreach ($orderIds as $orderId) {
  202. self::schedule($orderId, $delayMinutes);
  203. }
  204. }
  205. }