collectorName = static::class; } /** * 收集日志 * * @return int 处理的记录数 */ public function collect(): int { try { // 优先使用时间线处理,如果没有时间戳记录则回退到ID处理 $lastProcessedTimestamp = $this->getLastProcessedTimestamp(); if ($lastProcessedTimestamp > 0) { return $this->collectByTimeline(); } else { return $this->collectByIdWithTimelineUpgrade(); } } catch (\Exception $e) { Log::error("日志收集失败", [ 'collector' => $this->collectorName, 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); return 0; } } /** * 按时间线收集日志 * * @return int 处理的记录数 */ private function collectByTimeline(): int { $lastProcessedTimestamp = $this->getLastProcessedTimestamp(); $records = $this->getNewRecordsByTime($lastProcessedTimestamp); if ($records->isEmpty()) { return 0; } $processedCount = 0; $userLogs = []; $maxTimestamp = $lastProcessedTimestamp; foreach ($records as $record) { try { $userLogData = $this->convertToUserLog($record); if ($userLogData) { $userLogs[] = $userLogData; $processedCount++; } $recordTimestamp = $this->getRecordTimestamp($record); if ($recordTimestamp > $maxTimestamp) { $maxTimestamp = $recordTimestamp; } } catch (\Exception $e) { Log::error("转换日志记录失败", [ 'collector' => $this->collectorName, 'record_id' => $record->id ?? null, 'error' => $e->getMessage() ]); } } if (!empty($userLogs)) { UserLogService::batchLog($userLogs); } // 不再需要手动更新时间戳,进度通过user_logs表自动追踪 Log::info("时间线日志收集完成", [ 'collector' => $this->collectorName, 'processed_count' => $processedCount, 'last_timestamp' => $maxTimestamp ]); return $processedCount; } /** * 按ID收集日志并升级到时间线处理 * * @return int 处理的记录数 */ private function collectByIdWithTimelineUpgrade(): int { $lastProcessedId = $this->getLastProcessedId(); $records = $this->getNewRecords($lastProcessedId); if ($records->isEmpty()) { return 0; } $processedCount = 0; $userLogs = []; $maxId = $lastProcessedId; $maxTimestamp = 0; foreach ($records as $record) { try { $userLogData = $this->convertToUserLog($record); if ($userLogData) { $userLogs[] = $userLogData; $processedCount++; } if ($record->id > $maxId) { $maxId = $record->id; } $recordTimestamp = $this->getRecordTimestamp($record); if ($recordTimestamp > $maxTimestamp) { $maxTimestamp = $recordTimestamp; } } catch (\Exception $e) { Log::error("转换日志记录失败", [ 'collector' => $this->collectorName, 'record_id' => $record->id ?? null, 'error' => $e->getMessage() ]); } } if (!empty($userLogs)) { UserLogService::batchLog($userLogs); } // 不再需要手动更新进度,进度通过user_logs表自动追踪 Log::info("ID日志收集完成并升级到时间线", [ 'collector' => $this->collectorName, 'processed_count' => $processedCount, 'last_id' => $maxId, 'last_timestamp' => $maxTimestamp ]); return $processedCount; } /** * 获取新的记录(子类实现) * * @param int $lastProcessedId 上次处理的最大ID * @return \Illuminate\Database\Eloquent\Collection */ abstract protected function getNewRecords(int $lastProcessedId); /** * 按时间获取新的记录(子类实现) * * @param int $lastProcessedTimestamp 上次处理的最大时间戳 * @return \Illuminate\Database\Eloquent\Collection */ abstract protected function getNewRecordsByTime(int $lastProcessedTimestamp); /** * 获取记录的时间戳(子类实现) * * @param mixed $record 原始记录 * @return int 时间戳 */ abstract protected function getRecordTimestamp($record): int; /** * 根据记录ID获取原始记录的时间戳(子类实现) * * @param int $recordId 记录ID * @return int 时间戳 */ abstract protected function getOriginalRecordTimestamp(int $recordId): int; /** * 公共方法:按时间获取新的记录 * * @param int $lastProcessedTimestamp 上次处理的最大时间戳 * @return \Illuminate\Database\Eloquent\Collection */ public function getNewRecordsByTimePublic(int $lastProcessedTimestamp) { return $this->getNewRecordsByTime($lastProcessedTimestamp); } /** * 公共方法:获取记录的时间戳 * * @param mixed $record 原始记录 * @return int 时间戳 */ public function getRecordTimestampPublic($record): int { return $this->getRecordTimestamp($record); } /** * 公共方法:转换记录为用户日志数据 * * @param mixed $record 原始记录 * @return array|null 用户日志数据,null表示跳过 */ public function convertToUserLogPublic($record): ?array { return $this->convertToUserLog($record); } /** * 转换记录为用户日志数据(子类实现) * * @param mixed $record 原始记录 * @return array|null 用户日志数据,null表示跳过 */ abstract protected function convertToUserLog($record): ?array; /** * 获取上次处理的最大ID * 从user_logs表中查询该收集器最后处理的记录ID * * @return int */ protected function getLastProcessedId(): int { try { $lastLog = \App\Module\Game\Models\UserLog::where('source_table', $this->sourceTable) ->where('source_type', $this->sourceType) ->orderBy('source_id', 'desc') ->first(); return $lastLog ? $lastLog->source_id : 0; } catch (\Exception $e) { Log::error("获取最后处理ID失败", [ 'collector' => $this->collectorName, 'source_table' => $this->sourceTable, 'error' => $e->getMessage() ]); return 0; } } /** * 更新最后处理的ID * 不再需要手动更新,因为进度通过user_logs表自动追踪 * * @param int $id * @return void */ protected function updateLastProcessedId(int $id): void { // 不再需要手动更新,进度通过user_logs表自动追踪 // 这个方法保留是为了兼容性 } /** * 获取上次处理的最大时间戳 * 从user_logs表中查询该收集器最后处理的记录时间戳 * * @return int */ protected function getLastProcessedTimestamp(): int { try { $lastLog = \App\Module\Game\Models\UserLog::where('source_table', $this->sourceTable) ->where('source_type', $this->sourceType) ->orderBy('created_at', 'desc') ->first(); if (!$lastLog) { return 0; } // 获取原始记录的时间戳 return $this->getOriginalRecordTimestamp($lastLog->source_id); } catch (\Exception $e) { Log::error("获取最后处理时间戳失败", [ 'collector' => $this->collectorName, 'source_table' => $this->sourceTable, 'error' => $e->getMessage() ]); return 0; } } /** * 更新最后处理的时间戳 * 不再需要手动更新,因为进度通过user_logs表自动追踪 * * @param int $timestamp * @return void */ protected function updateLastProcessedTimestamp(int $timestamp): void { // 不再需要手动更新,进度通过user_logs表自动追踪 // 这个方法保留是为了兼容性 } /** * 获取最后处理ID的缓存键 * * @return string */ protected function getLastProcessedIdCacheKey(): string { return "user_log_collector:last_processed_id:" . $this->sourceTable; } /** * 获取最后处理时间戳的缓存键 * * @return string */ protected function getLastProcessedTimestampCacheKey(): string { return "user_log_collector:last_processed_timestamp:" . $this->sourceTable; } /** * 创建用户日志数据数组 * * @param int $userId 用户ID * @param string $message 日志消息 * @param int $sourceId 来源记录ID * @param string|null $createdAt 创建时间,null则使用当前时间 * @return array */ protected function createUserLogData(int $userId, string $message, int $sourceId, ?string $createdAt = null): array { return [ 'user_id' => $userId, 'message' => $message, 'source_type' => $this->sourceType, 'source_id' => $sourceId, 'source_table' => $this->sourceTable, 'created_at' => $createdAt ?? now()->toDateTimeString(), ]; } /** * 获取收集器名称 * * @return string */ public function getCollectorName(): string { return $this->collectorName; } /** * 获取源表名 * * @return string */ public function getSourceTable(): string { return $this->sourceTable; } /** * 获取源类型 * * @return string */ public function getSourceType(): string { return $this->sourceType; } /** * 重置最后处理的ID(用于重新处理) * * @return void */ public function resetLastProcessedId(): void { $cacheKey = $this->getLastProcessedIdCacheKey(); Cache::forget($cacheKey); } /** * 重置最后处理的时间戳(用于重新处理) * * @return void */ public function resetLastProcessedTimestamp(): void { $cacheKey = $this->getLastProcessedTimestampCacheKey(); Cache::forget($cacheKey); } /** * 重置所有处理进度 * * @return void */ public function resetAllProgress(): void { $this->resetLastProcessedId(); $this->resetLastProcessedTimestamp(); } }