ShouldQueue.php 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. <?php
  2. namespace UCore\Queue;
  3. use Illuminate\Contracts\Queue\ShouldQueue as LaravelShouldQueue;
  4. use Illuminate\Queue\InteractsWithQueue;
  5. use Illuminate\Support\Facades\Log;
  6. use UCore\Helper\Logger;
  7. use UCore\Queue\Helper;
  8. /**
  9. * 队列事件基类
  10. *
  11. * 为队列事件监听器提供统一的基础功能,包括:
  12. * - 队列配置管理
  13. * - 错误处理和日志记录
  14. * - 重试机制
  15. * - 事件处理生命周期管理
  16. */
  17. abstract class ShouldQueue implements LaravelShouldQueue, ShouldQueueInterface
  18. {
  19. use InteractsWithQueue;
  20. /**
  21. * 队列名称
  22. *
  23. * @var string|null
  24. */
  25. public $queue = null;
  26. /**
  27. * 最大重试次数
  28. *
  29. * @var int
  30. */
  31. public $tries = 3;
  32. /**
  33. * 任务超时时间(秒)
  34. *
  35. * @var int
  36. */
  37. public $timeout = 300;
  38. /**
  39. * 重试延迟时间(秒)
  40. *
  41. * @var int
  42. */
  43. public $retryAfter = 60;
  44. /**
  45. * 是否在模型缺失时删除任务
  46. *
  47. * @var bool
  48. */
  49. public $deleteWhenMissingModels = true;
  50. /**
  51. * 事件数据
  52. *
  53. * @var object|null
  54. */
  55. protected $event = null;
  56. /**
  57. * 创建事件监听器实例
  58. */
  59. public function __construct()
  60. {
  61. // 子类可以重写此方法进行初始化
  62. }
  63. /**
  64. * Laravel队列系统调用的handle方法
  65. *
  66. * 包含类似QueueJob::handle的逻辑,提供统一的执行流程
  67. *
  68. * @param object $event 事件对象
  69. * @return void
  70. */
  71. public function handle(object $event): void
  72. {
  73. $this->event = $event;
  74. $start = microtime(true);
  75. $queueName = $this->queue ?? 'default';
  76. $className = static::class;
  77. $payload = $this->getEventPayload();
  78. Helper::add_log('handle', $queueName, $className, $payload);
  79. $res = null;
  80. $diff = 0;
  81. try {
  82. $res = $this->run($event);
  83. $diff = microtime(true) - $start;
  84. if ($res) {
  85. // 事件处理成功,记录日志
  86. $this->logInfo('事件处理成功', [
  87. 'event_class' => get_class($event),
  88. 'execution_time' => $diff
  89. ]);
  90. }
  91. } catch (\Throwable $exception) {
  92. $diff = microtime(true) - $start;
  93. $res = false;
  94. Logger::exception('queue_event', $exception);
  95. $desc = $exception->getMessage() . "\n" . $exception->getTraceAsString();
  96. Helper::add_log('Throwable-' . get_class($exception), $queueName, $className, $payload, $desc);
  97. // 记录异常日志
  98. $this->logError('事件处理异常', [
  99. 'event_class' => get_class($event),
  100. 'error' => $exception->getMessage(),
  101. 'trace' => $exception->getTraceAsString()
  102. ]);
  103. // 重新抛出异常以触发重试机制
  104. throw $exception;
  105. }
  106. // 统一在这里记录结束日志
  107. Helper::add_log('runend-' . ($res ? 'true' : 'false'), $queueName, $className, $payload, '', $diff);
  108. }
  109. /**
  110. * 实际运行方法(实现ShouldQueueInterface)
  111. *
  112. * 子类必须实现此方法来处理具体的事件逻辑
  113. *
  114. * @param object $event 事件对象
  115. * @return bool 返回true表示处理成功,false表示处理失败
  116. */
  117. abstract public function run(object $event): bool;
  118. /**
  119. * 获取事件载荷数据
  120. *
  121. * 用于日志记录和调试
  122. *
  123. * @return array
  124. */
  125. protected function getEventPayload(): array
  126. {
  127. if (!$this->event) {
  128. return [];
  129. }
  130. $payload = [
  131. 'event_class' => get_class($this->event),
  132. 'listener_class' => static::class,
  133. 'queue' => $this->queue,
  134. ];
  135. // 尝试获取事件的数据
  136. if (method_exists($this->event, 'toArray')) {
  137. $payload['event_data'] = $this->event->toArray();
  138. } elseif (method_exists($this->event, 'getData')) {
  139. $payload['event_data'] = $this->event->getData();
  140. } else {
  141. // 获取事件的公共属性
  142. $payload['event_data'] = get_object_vars($this->event);
  143. }
  144. return $payload;
  145. }
  146. /**
  147. * 任务失败时的处理
  148. *
  149. * 当任务执行失败或达到最大重试次数时会调用此方法
  150. *
  151. * @param \Throwable $exception 导致任务失败的异常
  152. * @return void
  153. */
  154. public function failed(\Throwable $exception): void
  155. {
  156. $this->logError('队列事件处理失败', [
  157. 'listener_class' => static::class,
  158. 'error' => $exception->getMessage(),
  159. 'trace' => $exception->getTraceAsString(),
  160. 'attempts' => $this->attempts() ?? 0,
  161. ]);
  162. // 子类可以重写此方法实现自定义的失败处理逻辑
  163. $this->handleFailure($exception);
  164. }
  165. /**
  166. * 自定义失败处理逻辑
  167. *
  168. * 子类可以重写此方法实现特定的失败处理逻辑
  169. *
  170. * @param \Throwable $exception
  171. * @return void
  172. */
  173. protected function handleFailure(\Throwable $exception): void
  174. {
  175. // 默认不做任何处理,子类可以重写
  176. }
  177. /**
  178. * 获取重试延迟时间
  179. *
  180. * 支持指数退避策略
  181. *
  182. * @return int|array
  183. */
  184. public function backoff(): int|array
  185. {
  186. $attempt = $this->attempts() ?? 1;
  187. // 指数退避:第1次60秒,第2次120秒,第3次240秒
  188. return min($this->retryAfter * pow(2, $attempt - 1), 3600); // 最大1小时
  189. }
  190. /**
  191. * 记录信息日志
  192. *
  193. * @param string $message 日志消息
  194. * @param array $context 上下文数据
  195. * @return void
  196. */
  197. protected function logInfo(string $message, array $context = []): void
  198. {
  199. $context['listener_class'] = static::class;
  200. $context['queue'] = $this->queue;
  201. Log::info($message, $context);
  202. Logger::info('QueueEvent', $message, $context);
  203. }
  204. /**
  205. * 记录错误日志
  206. *
  207. * @param string $message 日志消息
  208. * @param array $context 上下文数据
  209. * @return void
  210. */
  211. protected function logError(string $message, array $context = []): void
  212. {
  213. $context['listener_class'] = static::class;
  214. $context['queue'] = $this->queue;
  215. Log::error($message, $context);
  216. Logger::error('QueueEvent', $message, $context);
  217. }
  218. /**
  219. * 记录警告日志
  220. *
  221. * @param string $message 日志消息
  222. * @param array $context 上下文数据
  223. * @return void
  224. */
  225. protected function logWarning(string $message, array $context = []): void
  226. {
  227. $context['listener_class'] = static::class;
  228. $context['queue'] = $this->queue;
  229. Log::warning($message, $context);
  230. Logger::warning('QueueEvent', $message, $context);
  231. }
  232. /**
  233. * 安全执行事件处理
  234. *
  235. * 提供统一的异常处理和日志记录
  236. * 注意:此方法主要用于向后兼容,建议直接在handleEvent中实现逻辑
  237. *
  238. * @param object $event 事件对象
  239. * @param callable $handler 处理函数
  240. * @return void
  241. */
  242. protected function safeHandle(object $event, callable $handler): void
  243. {
  244. try {
  245. $this->logInfo('开始处理队列事件', [
  246. 'event_class' => get_class($event),
  247. 'event_data' => method_exists($event, 'toArray') ? $event->toArray() : [],
  248. ]);
  249. $handler($event);
  250. $this->logInfo('队列事件处理完成', [
  251. 'event_class' => get_class($event),
  252. ]);
  253. } catch (\Throwable $exception) {
  254. $this->logError('队列事件处理异常', [
  255. 'event_class' => get_class($event),
  256. 'error' => $exception->getMessage(),
  257. 'trace' => $exception->getTraceAsString(),
  258. ]);
  259. // 重新抛出异常以触发重试机制
  260. throw $exception;
  261. }
  262. }
  263. /**
  264. * 获取事件的唯一标识
  265. *
  266. * 用于去重或追踪
  267. *
  268. * @param object $event
  269. * @return string
  270. */
  271. protected function getEventId(object $event): string
  272. {
  273. $eventClass = get_class($event);
  274. // 尝试获取事件的ID属性
  275. if (property_exists($event, 'id')) {
  276. return $eventClass . ':' . $event->id;
  277. }
  278. // 尝试获取事件的其他标识属性
  279. foreach (['uuid', 'key', 'identifier'] as $property) {
  280. if (property_exists($event, $property)) {
  281. return $eventClass . ':' . $event->$property;
  282. }
  283. }
  284. // 如果没有找到标识属性,使用类名和时间戳
  285. return $eventClass . ':' . microtime(true);
  286. }
  287. /**
  288. * 检查事件是否应该被处理
  289. *
  290. * 子类可以重写此方法实现自定义的过滤逻辑
  291. *
  292. * @param object $event
  293. * @return bool
  294. */
  295. protected function shouldHandle(object $event): bool
  296. {
  297. return true;
  298. }
  299. /**
  300. * 获取任务标签
  301. *
  302. * 用于监控和调试
  303. *
  304. * @return array
  305. */
  306. public function tags(): array
  307. {
  308. return [
  309. 'listener:' . class_basename(static::class),
  310. 'queue:' . ($this->queue ?? 'default'),
  311. ];
  312. }
  313. }