131158-重构用户日志收集器架构.md 6.1 KB

重构用户日志收集器架构

时间: 2025-06-13 11:11:58
任务: 重构用户日志收集器架构,改为每个源表独立收集器,基于ID进度追踪

任务背景

用户反馈收集器定义有问题,应该是每个源表都定义一个收集器,而不是按模块定义。同时,根据时间来判断收集进度可能遗漏数据,需要改为根据自增ID判断。

主要问题

1. 收集器架构问题

  • FarmLogCollector 处理两个表:farm_harvest_logsfarm_upgrade_logs
  • 进度追踪复杂:需要查询两个表的最小ID
  • 逻辑复杂:需要合并和排序不同表的数据

2. 时间戳进度追踪问题

  • 时间戳相同:多条记录可能有相同的时间戳
  • 时钟回拨:服务器时间可能被调整
  • 并发插入:高并发情况下时间戳可能不准确
  • 时区问题:不同时区可能导致时间混乱

解决方案

1. 收集器架构重构

原架构

FundLogCollector → fund_logs
ItemLogCollector → item_transaction_logs  
FarmLogCollector → farm_harvest_logs + farm_upgrade_logs (复杂)
PointLogCollector → point_logs

新架构

FundLogCollector → fund_logs
ItemLogCollector → item_transaction_logs
FarmHarvestLogCollector → farm_harvest_logs (独立)
FarmUpgradeLogCollector → farm_upgrade_logs (独立)
PointLogCollector → point_logs

2. 进度追踪机制改进

从时间戳追踪改为ID追踪

原方式(时间戳)

$lastTimestamp = $this->getLastProcessedTimestamp();
$records = $sourceModel::where('created_at', '>', $lastTimestamp)
    ->orderBy('created_at')
    ->limit($this->maxRecords)
    ->get();

新方式(ID)

$lastProcessedId = $this->getLastProcessedId();
$records = $sourceModel::where('id', '>', $lastProcessedId)
    ->orderBy('id')
    ->limit($this->maxRecords)
    ->get();

实施步骤

1. 创建新的收集器

FarmHarvestLogCollector

  • 专门处理 farm_harvest_logs
  • 简化收获日志转换逻辑
  • 独立进度追踪

FarmUpgradeLogCollector

  • 专门处理 farm_upgrade_logs
  • 处理房屋和土地升级日志
  • 独立进度追踪

2. 修改BaseLogCollector

简化collect方法

public function collect(): int
{
    try {
        // 使用ID进行进度追踪,确保不遗漏数据
        return $this->collectById();
    } catch (\Exception $e) {
        Log::error("日志收集失败", [
            'collector' => $this->collectorName,
            'error' => $e->getMessage()
        ]);
        return 0;
    }
}

新的collectById方法

private function collectById(): int
{
    $lastProcessedId = $this->getLastProcessedId();
    $records = $this->getNewRecords($lastProcessedId);

    if ($records->isEmpty()) {
        return 0;
    }

    $userLogs = [];
    foreach ($records as $record) {
        $userLogData = $this->convertToUserLog($record);
        if ($userLogData) {
            $userLogs[] = $userLogData;
        }
    }

    if (!empty($userLogs)) {
        // 按原始时间排序后批量保存
        usort($userLogs, function($a, $b) {
            return strtotime($a['original_time']) <=> strtotime($b['original_time']);
        });
        
        UserLogService::batchLog($userLogs);
    }

    return count($userLogs);
}

3. 移除时间戳相关代码

移除的方法

  • getNewRecordsByTime()
  • getRecordTimestamp()
  • getOriginalRecordTimestamp()
  • getLastProcessedTimestamp()
  • updateLastProcessedTimestamp()

移除的功能

  • 重置功能(基于ID追踪无需重置)
  • 时间戳缓存机制
  • 复杂的时间线处理逻辑

4. 更新UserLogCollectorManager

private function registerCollectors(): void
{
    $this->collectors = [
        'fund' => new FundLogCollector(),
        'item' => new ItemLogCollector(),
        'farm_harvest' => new FarmHarvestLogCollector(),
        'farm_upgrade' => new FarmUpgradeLogCollector(),
        'point' => new PointLogCollector(),
    ];
}

技术优势

ID追踪 vs 时间戳追踪对比

特性 基于ID追踪 基于时间追踪
数据完整性 ✅ 保证不遗漏 ❌ 可能遗漏相同时间戳的记录
时钟容错 ✅ 不受时钟调整影响 ❌ 受服务器时间影响
并发安全 ✅ 自增ID唯一 ❌ 并发时时间戳可能重复
恢复能力 ✅ 可精确断点续传 ❌ 时间重叠可能重复处理
实现复杂度 ✅ 简单直观 ❌ 需要复杂的时间处理

架构优势

  1. 职责单一:每个收集器只处理一个表
  2. 逻辑简化:避免复杂的多表查询和合并
  3. 易于维护:独立的收集器便于调试和扩展
  4. 性能提升:减少复杂查询,提高处理效率

保持的功能

双时间戳设计

  • original_time: 业务发生的原始时间(用于排序显示)
  • collected_at: 日志收集时间(用于监控追踪)

原时间排序

  • 收集到的日志按原始时间排序
  • 确保用户看到的顺序与业务发生顺序一致

批量处理

  • 继续使用批量插入提高性能
  • 保持2秒延迟收集策略

测试验证

数据完整性验证

-- 验证收集进度
SELECT 
    source_table,
    source_type,
    COUNT(*) as collected_count,
    MAX(source_id) as max_source_id
FROM kku_user_logs 
GROUP BY source_table, source_type;

时间排序验证

-- 验证时间排序
SELECT 
    id,
    message,
    original_time,
    collected_at,
    TIMESTAMPDIFF(SECOND, original_time, collected_at) as delay_seconds
FROM kku_user_logs 
ORDER BY original_time DESC 
LIMIT 10;

总结

通过这次重构,用户日志收集系统实现了:

  1. 架构简化:每个表独立收集器,职责清晰
  2. 数据完整性:基于ID追踪,确保不遗漏记录
  3. 系统稳定性:移除复杂的时间处理逻辑
  4. 易于扩展:新增表只需创建对应收集器

这个新架构更加健壮、可靠,为后续的功能扩展奠定了良好的基础。