|
|
@@ -34,7 +34,11 @@ class CollectUserLogsCommand extends Command
|
|
|
--limit 单次处理最大记录数(默认1000)
|
|
|
--force 强制执行,忽略自动收集配置
|
|
|
|
|
|
-注意:进度追踪基于user_logs表中的已处理记录,系统会自动从上次处理位置继续收集';
|
|
|
+进度追踪机制:
|
|
|
+ - 基于原始日志ID进行进度追踪,确保不遗漏任何记录
|
|
|
+ - 通过user_logs表中的source_id字段自动维护进度
|
|
|
+ - 每个收集器独立追踪进度,无需手动重置
|
|
|
+ - 系统会自动从上次处理的最大ID继续收集';
|
|
|
|
|
|
/**
|
|
|
* 执行命令
|
|
|
@@ -230,9 +234,8 @@ class CollectUserLogsCommand extends Command
|
|
|
$collectorsInfo = $manager->getCollectorsInfo();
|
|
|
$info = $collectorsInfo[$collectorName];
|
|
|
|
|
|
- // 从user_logs表获取最后处理的记录
|
|
|
+ // 从user_logs表获取最后处理的记录ID
|
|
|
$lastProcessedId = $this->getLastProcessedIdFromUserLogs($info['source_table'], $info['source_type']);
|
|
|
- $lastProcessedTimestamp = $this->getLastProcessedTimestampFromUserLogs($info['source_table'], $info['source_type']);
|
|
|
|
|
|
// 获取源表的最大ID
|
|
|
$maxId = $this->getTableMaxId($info['source_table']);
|
|
|
@@ -308,31 +311,8 @@ class CollectUserLogsCommand extends Command
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 从user_logs表获取最后处理的时间戳
|
|
|
- *
|
|
|
- * @param string $sourceTable
|
|
|
- * @param string $sourceType
|
|
|
- * @return int
|
|
|
- */
|
|
|
- private function getLastProcessedTimestampFromUserLogs(string $sourceTable, string $sourceType): int
|
|
|
- {
|
|
|
- try {
|
|
|
- $lastLog = \App\Module\Game\Models\UserLog::where('source_table', $sourceTable)
|
|
|
- ->where('source_type', $sourceType)
|
|
|
- ->orderBy('created_at', 'desc')
|
|
|
- ->first();
|
|
|
-
|
|
|
- if (!$lastLog) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- // 返回user_logs记录的创建时间戳
|
|
|
- return strtotime($lastLog->created_at);
|
|
|
- } catch (\Exception $e) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- }
|
|
|
+ // 注意:getLastProcessedTimestampFromUserLogs方法已移除
|
|
|
+ // 当前使用基于ID的进度追踪,不再需要时间戳追踪
|
|
|
|
|
|
/**
|
|
|
* 显示收集器信息
|
|
|
@@ -535,75 +515,34 @@ class CollectUserLogsCommand extends Command
|
|
|
];
|
|
|
}
|
|
|
|
|
|
+ // 注意:getAllRecordsByTimeline方法已移除
|
|
|
+ // 当前使用基于ID的进度追踪,每个收集器独立处理,不再需要全局时间线排序
|
|
|
+
|
|
|
/**
|
|
|
- * 从所有收集器获取按时间线排序的记录
|
|
|
+ * 显示收集器进度信息
|
|
|
+ * 基于ID的进度追踪,显示各收集器的处理状态
|
|
|
*
|
|
|
- * @param UserLogCollectorManager $manager
|
|
|
- * @param int $lastTimestamp
|
|
|
- * @param int $limit
|
|
|
- * @return array
|
|
|
+ * @return void
|
|
|
*/
|
|
|
- private function getAllRecordsByTimeline(UserLogCollectorManager $manager, int $lastTimestamp, int $limit): array
|
|
|
+ private function showTimelineProgress(): void
|
|
|
{
|
|
|
- $allRecords = [];
|
|
|
- $collectorsInfo = $manager->getCollectorsInfo();
|
|
|
+ $this->line("📈 收集器进度状态:");
|
|
|
|
|
|
- foreach ($collectorsInfo as $name => $info) {
|
|
|
- $collector = $manager->getCollector($name);
|
|
|
- if (!$collector) continue;
|
|
|
+ try {
|
|
|
+ $manager = new \App\Module\Game\Logics\UserLogCollectorManager();
|
|
|
+ $collectorsInfo = $manager->getCollectorsInfo();
|
|
|
|
|
|
- try {
|
|
|
- $records = $collector->getNewRecordsByTimePublic($lastTimestamp);
|
|
|
-
|
|
|
- foreach ($records as $record) {
|
|
|
- $timestamp = $collector->getRecordTimestampPublic($record);
|
|
|
-
|
|
|
- $allRecords[] = [
|
|
|
- 'id' => $record->id,
|
|
|
- 'timestamp' => $timestamp,
|
|
|
- 'collector' => $collector,
|
|
|
- 'record' => $record,
|
|
|
- 'source_type' => $info['source_type']
|
|
|
- ];
|
|
|
- }
|
|
|
+ foreach ($collectorsInfo as $name => $info) {
|
|
|
+ $lastProcessedId = $this->getLastProcessedIdFromUserLogs($info['source_table'], $info['source_type']);
|
|
|
+ $maxId = $this->getTableMaxId($info['source_table']);
|
|
|
+ $pendingCount = max(0, $maxId - $lastProcessedId);
|
|
|
|
|
|
- } catch (\Exception $e) {
|
|
|
- \Illuminate\Support\Facades\Log::error("获取收集器记录失败", [
|
|
|
- 'collector' => $name,
|
|
|
- 'error' => $e->getMessage()
|
|
|
- ]);
|
|
|
+ $this->line(" 🔧 {$name}: 最后处理ID <comment>{$lastProcessedId}</comment>, 待处理 <info>{$pendingCount}</info> 条");
|
|
|
}
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ $this->line(" ⚠️ 无法获取进度信息: " . $e->getMessage());
|
|
|
}
|
|
|
|
|
|
- // 限制记录数量
|
|
|
- if (count($allRecords) > $limit) {
|
|
|
- // 先按时间排序,然后取前N条
|
|
|
- usort($allRecords, function($a, $b) {
|
|
|
- $timeA = $a['timestamp'];
|
|
|
- $timeB = $b['timestamp'];
|
|
|
- if ($timeA == $timeB) {
|
|
|
- return $a['id'] <=> $b['id'];
|
|
|
- }
|
|
|
- return $timeA <=> $timeB;
|
|
|
- });
|
|
|
-
|
|
|
- $allRecords = array_slice($allRecords, 0, $limit);
|
|
|
- }
|
|
|
-
|
|
|
- return $allRecords;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 显示时间线进度
|
|
|
- *
|
|
|
- * @return void
|
|
|
- */
|
|
|
- private function showTimelineProgress(): void
|
|
|
- {
|
|
|
- $lastTimestamp = $this->getGlobalLastProcessedTimestamp();
|
|
|
-
|
|
|
- $this->line("📈 时间线处理进度:");
|
|
|
- $this->line(" 🕐 最后处理时间: <comment>" . ($lastTimestamp > 0 ? date('Y-m-d H:i:s', $lastTimestamp) : '未开始') . "</comment>");
|
|
|
$this->line("");
|
|
|
}
|
|
|
|
|
|
@@ -647,52 +586,14 @@ class CollectUserLogsCommand extends Command
|
|
|
$this->line("");
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 获取全局最后处理时间戳
|
|
|
- * 从各个收集器获取最后处理的原始记录时间戳,取最小值
|
|
|
- *
|
|
|
- * @return int
|
|
|
- */
|
|
|
- private function getGlobalLastProcessedTimestamp(): int
|
|
|
- {
|
|
|
- try {
|
|
|
- $manager = new \App\Module\Game\Logics\UserLogCollectorManager();
|
|
|
- $collectorsInfo = $manager->getCollectorsInfo();
|
|
|
-
|
|
|
- $minTimestamp = PHP_INT_MAX;
|
|
|
- $hasValidTimestamp = false;
|
|
|
-
|
|
|
- foreach ($collectorsInfo as $name => $info) {
|
|
|
- $collector = $manager->getCollector($name);
|
|
|
- if (!$collector) continue;
|
|
|
-
|
|
|
- // 获取该收集器最后处理的时间戳
|
|
|
- $lastProcessedTimestamp = $this->getLastProcessedTimestampFromUserLogs($info['source_table'], $info['source_type']);
|
|
|
-
|
|
|
- if ($lastProcessedTimestamp > 0) {
|
|
|
- $minTimestamp = min($minTimestamp, $lastProcessedTimestamp);
|
|
|
- $hasValidTimestamp = true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return $hasValidTimestamp ? $minTimestamp : 0;
|
|
|
- } catch (\Exception $e) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 更新全局最后处理时间戳
|
|
|
- * 不再需要手动更新,因为进度通过user_logs表自动追踪
|
|
|
- *
|
|
|
- * @param int $timestamp
|
|
|
- * @return void
|
|
|
- */
|
|
|
- private function updateGlobalLastProcessedTimestamp(int $timestamp): void
|
|
|
- {
|
|
|
- // 不再需要手动更新,进度通过user_logs表自动追踪
|
|
|
- // 这个方法保留是为了兼容性
|
|
|
- }
|
|
|
+ // 注意:以下时间戳相关方法已移除,因为当前使用基于ID的进度追踪:
|
|
|
+ // - getGlobalLastProcessedTimestamp(): 全局时间戳追踪已废弃
|
|
|
+ // - updateGlobalLastProcessedTimestamp(): 手动更新时间戳已废弃
|
|
|
+ //
|
|
|
+ // 当前进度追踪机制:
|
|
|
+ // 1. 每个收集器通过getLastProcessedId()获取最后处理的记录ID
|
|
|
+ // 2. 进度通过user_logs表中的source_id字段自动维护
|
|
|
+ // 3. 无需手动重置或更新进度
|
|
|
|
|
|
/**
|
|
|
* 显示用户日志表统计
|