BaseLogCollector.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. <?php
  2. namespace App\Module\Game\Logics\UserLogCollectors;
  3. use App\Module\Game\Services\UserLogService;
  4. use Illuminate\Support\Facades\Log;
  5. /**
  6. * 用户日志收集器基类
  7. *
  8. * 为各个模块的日志收集器提供基础功能
  9. */
  10. abstract class BaseLogCollector
  11. {
  12. /**
  13. * 收集器名称
  14. *
  15. * @var string
  16. */
  17. protected string $collectorName;
  18. /**
  19. * 源表名
  20. *
  21. * @var string
  22. */
  23. protected string $sourceTable;
  24. /**
  25. * 源类型
  26. *
  27. * @var string
  28. */
  29. protected string $sourceType;
  30. /**
  31. * 最大处理记录数
  32. *
  33. * @var int
  34. */
  35. protected int $maxRecords = 1000;
  36. /**
  37. * 构造函数
  38. */
  39. public function __construct()
  40. {
  41. $this->collectorName = static::class;
  42. }
  43. /**
  44. * 收集日志
  45. *
  46. * @return int 处理的记录数
  47. */
  48. public function collect(): int
  49. {
  50. try {
  51. // 优先使用时间线处理,如果没有时间戳记录则回退到ID处理
  52. $lastProcessedTimestamp = $this->getLastProcessedTimestamp();
  53. if ($lastProcessedTimestamp > 0) {
  54. return $this->collectByTimeline();
  55. } else {
  56. return $this->collectByIdWithTimelineUpgrade();
  57. }
  58. } catch (\Exception $e) {
  59. Log::error("日志收集失败", [
  60. 'collector' => $this->collectorName,
  61. 'error' => $e->getMessage(),
  62. 'trace' => $e->getTraceAsString()
  63. ]);
  64. return 0;
  65. }
  66. }
  67. /**
  68. * 按时间线收集日志
  69. *
  70. * @return int 处理的记录数
  71. */
  72. private function collectByTimeline(): int
  73. {
  74. $lastProcessedTimestamp = $this->getLastProcessedTimestamp();
  75. $records = $this->getNewRecordsByTime($lastProcessedTimestamp);
  76. if ($records->isEmpty()) {
  77. return 0;
  78. }
  79. $processedCount = 0;
  80. $userLogs = [];
  81. $maxTimestamp = $lastProcessedTimestamp;
  82. foreach ($records as $record) {
  83. try {
  84. $userLogData = $this->convertToUserLog($record);
  85. if ($userLogData) {
  86. $userLogs[] = $userLogData;
  87. $processedCount++;
  88. }
  89. $recordTimestamp = $this->getRecordTimestamp($record);
  90. if ($recordTimestamp > $maxTimestamp) {
  91. $maxTimestamp = $recordTimestamp;
  92. }
  93. } catch (\Exception $e) {
  94. Log::error("转换日志记录失败", [
  95. 'collector' => $this->collectorName,
  96. 'record_id' => $record->id ?? null,
  97. 'error' => $e->getMessage()
  98. ]);
  99. }
  100. }
  101. if (!empty($userLogs)) {
  102. UserLogService::batchLog($userLogs);
  103. }
  104. // 不再需要手动更新时间戳,进度通过user_logs表自动追踪
  105. Log::info("时间线日志收集完成", [
  106. 'collector' => $this->collectorName,
  107. 'processed_count' => $processedCount,
  108. 'last_timestamp' => $maxTimestamp
  109. ]);
  110. return $processedCount;
  111. }
  112. /**
  113. * 按ID收集日志并升级到时间线处理
  114. *
  115. * @return int 处理的记录数
  116. */
  117. private function collectByIdWithTimelineUpgrade(): int
  118. {
  119. $lastProcessedId = $this->getLastProcessedId();
  120. $records = $this->getNewRecords($lastProcessedId);
  121. if ($records->isEmpty()) {
  122. return 0;
  123. }
  124. $processedCount = 0;
  125. $userLogs = [];
  126. $maxId = $lastProcessedId;
  127. $maxTimestamp = 0;
  128. foreach ($records as $record) {
  129. try {
  130. $userLogData = $this->convertToUserLog($record);
  131. if ($userLogData) {
  132. $userLogs[] = $userLogData;
  133. $processedCount++;
  134. }
  135. if ($record->id > $maxId) {
  136. $maxId = $record->id;
  137. }
  138. $recordTimestamp = $this->getRecordTimestamp($record);
  139. if ($recordTimestamp > $maxTimestamp) {
  140. $maxTimestamp = $recordTimestamp;
  141. }
  142. } catch (\Exception $e) {
  143. Log::error("转换日志记录失败", [
  144. 'collector' => $this->collectorName,
  145. 'record_id' => $record->id ?? null,
  146. 'error' => $e->getMessage()
  147. ]);
  148. }
  149. }
  150. if (!empty($userLogs)) {
  151. UserLogService::batchLog($userLogs);
  152. }
  153. // 不再需要手动更新进度,进度通过user_logs表自动追踪
  154. Log::info("ID日志收集完成并升级到时间线", [
  155. 'collector' => $this->collectorName,
  156. 'processed_count' => $processedCount,
  157. 'last_id' => $maxId,
  158. 'last_timestamp' => $maxTimestamp
  159. ]);
  160. return $processedCount;
  161. }
  162. /**
  163. * 获取新的记录(子类实现)
  164. *
  165. * @param int $lastProcessedId 上次处理的最大ID
  166. * @return \Illuminate\Database\Eloquent\Collection
  167. */
  168. abstract protected function getNewRecords(int $lastProcessedId);
  169. /**
  170. * 按时间获取新的记录(子类实现)
  171. *
  172. * @param int $lastProcessedTimestamp 上次处理的最大时间戳
  173. * @return \Illuminate\Database\Eloquent\Collection
  174. */
  175. abstract protected function getNewRecordsByTime(int $lastProcessedTimestamp);
  176. /**
  177. * 获取记录的时间戳(子类实现)
  178. *
  179. * @param mixed $record 原始记录
  180. * @return int 时间戳
  181. */
  182. abstract protected function getRecordTimestamp($record): int;
  183. /**
  184. * 根据记录ID获取原始记录的时间戳(子类实现)
  185. *
  186. * @param int $recordId 记录ID
  187. * @return int 时间戳
  188. */
  189. abstract protected function getOriginalRecordTimestamp(int $recordId): int;
  190. /**
  191. * 公共方法:按时间获取新的记录
  192. *
  193. * @param int $lastProcessedTimestamp 上次处理的最大时间戳
  194. * @return \Illuminate\Database\Eloquent\Collection
  195. */
  196. public function getNewRecordsByTimePublic(int $lastProcessedTimestamp)
  197. {
  198. return $this->getNewRecordsByTime($lastProcessedTimestamp);
  199. }
  200. /**
  201. * 公共方法:获取记录的时间戳
  202. *
  203. * @param mixed $record 原始记录
  204. * @return int 时间戳
  205. */
  206. public function getRecordTimestampPublic($record): int
  207. {
  208. return $this->getRecordTimestamp($record);
  209. }
  210. /**
  211. * 公共方法:转换记录为用户日志数据
  212. *
  213. * @param mixed $record 原始记录
  214. * @return array|null 用户日志数据,null表示跳过
  215. */
  216. public function convertToUserLogPublic($record): ?array
  217. {
  218. return $this->convertToUserLog($record);
  219. }
  220. /**
  221. * 转换记录为用户日志数据(子类实现)
  222. *
  223. * @param mixed $record 原始记录
  224. * @return array|null 用户日志数据,null表示跳过
  225. */
  226. abstract protected function convertToUserLog($record): ?array;
  227. /**
  228. * 获取上次处理的最大ID
  229. * 从user_logs表中查询该收集器最后处理的记录ID
  230. *
  231. * @return int
  232. */
  233. protected function getLastProcessedId(): int
  234. {
  235. try {
  236. $lastLog = \App\Module\Game\Models\UserLog::where('source_table', $this->sourceTable)
  237. ->where('source_type', $this->sourceType)
  238. ->orderBy('source_id', 'desc')
  239. ->first();
  240. return $lastLog ? $lastLog->source_id : 0;
  241. } catch (\Exception $e) {
  242. Log::error("获取最后处理ID失败", [
  243. 'collector' => $this->collectorName,
  244. 'source_table' => $this->sourceTable,
  245. 'error' => $e->getMessage()
  246. ]);
  247. return 0;
  248. }
  249. }
  250. /**
  251. * 更新最后处理的ID
  252. * 不再需要手动更新,因为进度通过user_logs表自动追踪
  253. *
  254. * @param int $id
  255. * @return void
  256. */
  257. protected function updateLastProcessedId(int $id): void
  258. {
  259. // 不再需要手动更新,进度通过user_logs表自动追踪
  260. // 这个方法保留是为了兼容性
  261. }
  262. /**
  263. * 获取上次处理的最大时间戳
  264. * 从user_logs表中查询该收集器最后处理的记录的原始时间戳
  265. *
  266. * @return int
  267. */
  268. protected function getLastProcessedTimestamp(): int
  269. {
  270. try {
  271. $lastLog = \App\Module\Game\Models\UserLog::where('source_table', $this->sourceTable)
  272. ->where('source_type', $this->sourceType)
  273. ->whereNotNull('original_time')
  274. ->orderBy('original_time', 'desc')
  275. ->first();
  276. if (!$lastLog || !$lastLog->original_time) {
  277. return 0;
  278. }
  279. // 直接返回原始时间戳
  280. return $lastLog->original_time->timestamp;
  281. } catch (\Exception $e) {
  282. Log::error("获取最后处理时间戳失败", [
  283. 'collector' => $this->collectorName,
  284. 'source_table' => $this->sourceTable,
  285. 'error' => $e->getMessage()
  286. ]);
  287. return 0;
  288. }
  289. }
  290. /**
  291. * 更新最后处理的时间戳
  292. * 不再需要手动更新,因为进度通过user_logs表自动追踪
  293. *
  294. * @param int $timestamp
  295. * @return void
  296. */
  297. protected function updateLastProcessedTimestamp(int $timestamp): void
  298. {
  299. // 不再需要手动更新,进度通过user_logs表自动追踪
  300. // 这个方法保留是为了兼容性
  301. }
  302. /**
  303. * 获取最后处理ID的缓存键
  304. *
  305. * @return string
  306. */
  307. protected function getLastProcessedIdCacheKey(): string
  308. {
  309. return "user_log_collector:last_processed_id:" . $this->sourceTable;
  310. }
  311. /**
  312. * 获取最后处理时间戳的缓存键
  313. *
  314. * @return string
  315. */
  316. protected function getLastProcessedTimestampCacheKey(): string
  317. {
  318. return "user_log_collector:last_processed_timestamp:" . $this->sourceTable;
  319. }
  320. /**
  321. * 创建用户日志数据数组
  322. *
  323. * @param int $userId 用户ID
  324. * @param string $message 日志消息
  325. * @param int $sourceId 来源记录ID
  326. * @param string|null $originalTime 原始时间(业务发生时间),null则使用当前时间
  327. * @return array
  328. */
  329. protected function createUserLogData(int $userId, string $message, int $sourceId, ?string $originalTime = null): array
  330. {
  331. $now = now()->toDateTimeString();
  332. $originalTime = $originalTime ?? $now;
  333. return [
  334. 'user_id' => $userId,
  335. 'message' => $message,
  336. 'source_type' => $this->sourceType,
  337. 'source_id' => $sourceId,
  338. 'source_table' => $this->sourceTable,
  339. 'original_time' => $originalTime, // 原始业务时间
  340. 'collected_at' => $now, // 收集时间
  341. 'created_at' => $now, // 兼容字段
  342. ];
  343. }
  344. /**
  345. * 获取收集器名称
  346. *
  347. * @return string
  348. */
  349. public function getCollectorName(): string
  350. {
  351. return $this->collectorName;
  352. }
  353. /**
  354. * 获取源表名
  355. *
  356. * @return string
  357. */
  358. public function getSourceTable(): string
  359. {
  360. return $this->sourceTable;
  361. }
  362. /**
  363. * 获取源类型
  364. *
  365. * @return string
  366. */
  367. public function getSourceType(): string
  368. {
  369. return $this->sourceType;
  370. }
  371. /**
  372. * 检查是否为重复记录
  373. *
  374. * @param string $sourceTable 源表名
  375. * @param int $sourceId 源记录ID
  376. * @return bool
  377. */
  378. protected function isDuplicateRecord(string $sourceTable, int $sourceId): bool
  379. {
  380. try {
  381. $exists = \App\Module\Game\Models\UserLog::where('source_type', $this->sourceType)
  382. ->where('source_table', $sourceTable)
  383. ->where('source_id', $sourceId)
  384. ->exists();
  385. return $exists;
  386. } catch (\Exception $e) {
  387. Log::error("检查重复记录失败", [
  388. 'collector' => $this->collectorName,
  389. 'source_table' => $sourceTable,
  390. 'source_id' => $sourceId,
  391. 'error' => $e->getMessage()
  392. ]);
  393. return false;
  394. }
  395. }
  396. }