# 重构用户日志收集器架构 **时间**: 2025-06-13 11:11:58 **任务**: 重构用户日志收集器架构,改为每个源表独立收集器,基于ID进度追踪 ## 任务背景 用户反馈收集器定义有问题,应该是每个源表都定义一个收集器,而不是按模块定义。同时,根据时间来判断收集进度可能遗漏数据,需要改为根据自增ID判断。 ## 主要问题 ### 1. 收集器架构问题 - **FarmLogCollector** 处理两个表:`farm_harvest_logs` 和 `farm_upgrade_logs` - **进度追踪复杂**:需要查询两个表的最小ID - **逻辑复杂**:需要合并和排序不同表的数据 ### 2. 时间戳进度追踪问题 - **时间戳相同**:多条记录可能有相同的时间戳 - **时钟回拨**:服务器时间可能被调整 - **并发插入**:高并发情况下时间戳可能不准确 - **时区问题**:不同时区可能导致时间混乱 ## 解决方案 ### 1. 收集器架构重构 #### 原架构 ``` FundLogCollector → fund_logs ItemLogCollector → item_transaction_logs FarmLogCollector → farm_harvest_logs + farm_upgrade_logs (复杂) PointLogCollector → point_logs ``` #### 新架构 ``` FundLogCollector → fund_logs ItemLogCollector → item_transaction_logs FarmHarvestLogCollector → farm_harvest_logs (独立) FarmUpgradeLogCollector → farm_upgrade_logs (独立) PointLogCollector → point_logs ``` ### 2. 进度追踪机制改进 #### 从时间戳追踪改为ID追踪 **原方式(时间戳)**: ```php $lastTimestamp = $this->getLastProcessedTimestamp(); $records = $sourceModel::where('created_at', '>', $lastTimestamp) ->orderBy('created_at') ->limit($this->maxRecords) ->get(); ``` **新方式(ID)**: ```php $lastProcessedId = $this->getLastProcessedId(); $records = $sourceModel::where('id', '>', $lastProcessedId) ->orderBy('id') ->limit($this->maxRecords) ->get(); ``` ## 实施步骤 ### 1. 创建新的收集器 #### FarmHarvestLogCollector - 专门处理 `farm_harvest_logs` 表 - 简化收获日志转换逻辑 - 独立进度追踪 #### FarmUpgradeLogCollector - 专门处理 `farm_upgrade_logs` 表 - 处理房屋和土地升级日志 - 独立进度追踪 ### 2. 修改BaseLogCollector #### 简化collect方法 ```php public function collect(): int { try { // 使用ID进行进度追踪,确保不遗漏数据 return $this->collectById(); } catch (\Exception $e) { Log::error("日志收集失败", [ 'collector' => $this->collectorName, 'error' => $e->getMessage() ]); return 0; } } ``` #### 新的collectById方法 ```php private function collectById(): int { $lastProcessedId = $this->getLastProcessedId(); $records = $this->getNewRecords($lastProcessedId); if ($records->isEmpty()) { return 0; } $userLogs = []; foreach ($records as $record) { $userLogData = $this->convertToUserLog($record); if ($userLogData) { $userLogs[] = $userLogData; } } if (!empty($userLogs)) { // 按原始时间排序后批量保存 usort($userLogs, function($a, $b) { return strtotime($a['original_time']) <=> strtotime($b['original_time']); }); UserLogService::batchLog($userLogs); } return count($userLogs); } ``` ### 3. 移除时间戳相关代码 #### 移除的方法 - `getNewRecordsByTime()` - `getRecordTimestamp()` - `getOriginalRecordTimestamp()` - `getLastProcessedTimestamp()` - `updateLastProcessedTimestamp()` #### 移除的功能 - 重置功能(基于ID追踪无需重置) - 时间戳缓存机制 - 复杂的时间线处理逻辑 ### 4. 更新UserLogCollectorManager ```php private function registerCollectors(): void { $this->collectors = [ 'fund' => new FundLogCollector(), 'item' => new ItemLogCollector(), 'farm_harvest' => new FarmHarvestLogCollector(), 'farm_upgrade' => new FarmUpgradeLogCollector(), 'point' => new PointLogCollector(), ]; } ``` ## 技术优势 ### ID追踪 vs 时间戳追踪对比 | 特性 | 基于ID追踪 | 基于时间追踪 | |------|------------|--------------| | **数据完整性** | ✅ 保证不遗漏 | ❌ 可能遗漏相同时间戳的记录 | | **时钟容错** | ✅ 不受时钟调整影响 | ❌ 受服务器时间影响 | | **并发安全** | ✅ 自增ID唯一 | ❌ 并发时时间戳可能重复 | | **恢复能力** | ✅ 可精确断点续传 | ❌ 时间重叠可能重复处理 | | **实现复杂度** | ✅ 简单直观 | ❌ 需要复杂的时间处理 | ### 架构优势 1. **职责单一**:每个收集器只处理一个表 2. **逻辑简化**:避免复杂的多表查询和合并 3. **易于维护**:独立的收集器便于调试和扩展 4. **性能提升**:减少复杂查询,提高处理效率 ## 保持的功能 ### 双时间戳设计 - **original_time**: 业务发生的原始时间(用于排序显示) - **collected_at**: 日志收集时间(用于监控追踪) ### 原时间排序 - 收集到的日志按原始时间排序 - 确保用户看到的顺序与业务发生顺序一致 ### 批量处理 - 继续使用批量插入提高性能 - 保持2秒延迟收集策略 ## 测试验证 ### 数据完整性验证 ```sql -- 验证收集进度 SELECT source_table, source_type, COUNT(*) as collected_count, MAX(source_id) as max_source_id FROM kku_user_logs GROUP BY source_table, source_type; ``` ### 时间排序验证 ```sql -- 验证时间排序 SELECT id, message, original_time, collected_at, TIMESTAMPDIFF(SECOND, original_time, collected_at) as delay_seconds FROM kku_user_logs ORDER BY original_time DESC LIMIT 10; ``` ## 总结 通过这次重构,用户日志收集系统实现了: 1. **架构简化**:每个表独立收集器,职责清晰 2. **数据完整性**:基于ID追踪,确保不遗漏记录 3. **系统稳定性**:移除复杂的时间处理逻辑 4. **易于扩展**:新增表只需创建对应收集器 这个新架构更加健壮、可靠,为后续的功能扩展奠定了良好的基础。