时间: 2025-06-13 11:11:58
任务: 重构用户日志收集器架构,改为每个源表独立收集器,基于ID进度追踪
用户反馈收集器定义有问题,应该是每个源表都定义一个收集器,而不是按模块定义。同时,根据时间来判断收集进度可能遗漏数据,需要改为根据自增ID判断。
farm_harvest_logs 和 farm_upgrade_logsFundLogCollector → fund_logs
ItemLogCollector → item_transaction_logs
FarmLogCollector → farm_harvest_logs + farm_upgrade_logs (复杂)
PointLogCollector → point_logs
FundLogCollector → fund_logs
ItemLogCollector → item_transaction_logs
FarmHarvestLogCollector → farm_harvest_logs (独立)
FarmUpgradeLogCollector → farm_upgrade_logs (独立)
PointLogCollector → point_logs
原方式(时间戳):
$lastTimestamp = $this->getLastProcessedTimestamp();
$records = $sourceModel::where('created_at', '>', $lastTimestamp)
->orderBy('created_at')
->limit($this->maxRecords)
->get();
新方式(ID):
$lastProcessedId = $this->getLastProcessedId();
$records = $sourceModel::where('id', '>', $lastProcessedId)
->orderBy('id')
->limit($this->maxRecords)
->get();
farm_harvest_logs 表farm_upgrade_logs 表public function collect(): int
{
try {
// 使用ID进行进度追踪,确保不遗漏数据
return $this->collectById();
} catch (\Exception $e) {
Log::error("日志收集失败", [
'collector' => $this->collectorName,
'error' => $e->getMessage()
]);
return 0;
}
}
private function collectById(): int
{
$lastProcessedId = $this->getLastProcessedId();
$records = $this->getNewRecords($lastProcessedId);
if ($records->isEmpty()) {
return 0;
}
$userLogs = [];
foreach ($records as $record) {
$userLogData = $this->convertToUserLog($record);
if ($userLogData) {
$userLogs[] = $userLogData;
}
}
if (!empty($userLogs)) {
// 按原始时间排序后批量保存
usort($userLogs, function($a, $b) {
return strtotime($a['original_time']) <=> strtotime($b['original_time']);
});
UserLogService::batchLog($userLogs);
}
return count($userLogs);
}
getNewRecordsByTime()getRecordTimestamp()getOriginalRecordTimestamp()getLastProcessedTimestamp()updateLastProcessedTimestamp()private function registerCollectors(): void
{
$this->collectors = [
'fund' => new FundLogCollector(),
'item' => new ItemLogCollector(),
'farm_harvest' => new FarmHarvestLogCollector(),
'farm_upgrade' => new FarmUpgradeLogCollector(),
'point' => new PointLogCollector(),
];
}
| 特性 | 基于ID追踪 | 基于时间追踪 |
|---|---|---|
| 数据完整性 | ✅ 保证不遗漏 | ❌ 可能遗漏相同时间戳的记录 |
| 时钟容错 | ✅ 不受时钟调整影响 | ❌ 受服务器时间影响 |
| 并发安全 | ✅ 自增ID唯一 | ❌ 并发时时间戳可能重复 |
| 恢复能力 | ✅ 可精确断点续传 | ❌ 时间重叠可能重复处理 |
| 实现复杂度 | ✅ 简单直观 | ❌ 需要复杂的时间处理 |
-- 验证收集进度
SELECT
source_table,
source_type,
COUNT(*) as collected_count,
MAX(source_id) as max_source_id
FROM kku_user_logs
GROUP BY source_table, source_type;
-- 验证时间排序
SELECT
id,
message,
original_time,
collected_at,
TIMESTAMPDIFF(SECOND, original_time, collected_at) as delay_seconds
FROM kku_user_logs
ORDER BY original_time DESC
LIMIT 10;
通过这次重构,用户日志收集系统实现了:
这个新架构更加健壮、可靠,为后续的功能扩展奠定了良好的基础。