Просмотр исходного кода

Transfer模块第四阶段开发完成 - 队列任务和命令行工具

第四阶段:队列任务和命令行工具开发
- 队列任务:创建ProcessTransferOrderJob和SendCallbackJob两个核心队列任务
- 命令行工具:创建TransferProcessCommand和TransferStatsCommand两个管理命令
- 事件系统:创建TransferOrderCreated和TransferOrderCompleted事件

队列任务功能:
- ProcessTransferOrderJob:支持订单处理、状态查询、重试等多种操作
- SendCallbackJob:智能回调发送,支持指数退避重试机制
- 完善的错误处理和失败恢复机制
- 支持任务唯一性和延迟调度

命令行工具:
- TransferProcessCommand:批量处理待处理订单和回调,支持队列和直接处理模式
- TransferStatsCommand:完整的统计报告,支持多种时间周期和数据导出
- 丰富的命令行参数和交互式界面

事件系统:
- TransferOrderCreated:订单创建事件,包含完整的订单信息和上下文
- TransferOrderCompleted:订单完成事件,支持状态变更跟踪和处理时长统计
- 支持事件数据序列化和广播

技术特点:
- 支持Laravel队列系统的完整特性
- 智能重试机制和错误恢复
- 详细的日志记录和监控
- 命令行工具支持试运行和批量操作
- 事件驱动架构支持解耦和扩展

下一阶段:后台管理界面开发
notfff 7 месяцев назад
Родитель
Сommit
ed45b75e

+ 231 - 0
app/Module/Transfer/Commands/TransferProcessCommand.php

@@ -0,0 +1,231 @@
+<?php
+
+namespace App\Module\Transfer\Commands;
+
+use App\Module\Transfer\Logics\OrderLogic;
+use App\Module\Transfer\Logics\CallbackLogic;
+use App\Module\Transfer\Models\TransferOrder;
+use App\Module\Transfer\Enums\TransferStatus;
+use App\Module\Transfer\Jobs\ProcessTransferOrderJob;
+use App\Module\Transfer\Jobs\SendCallbackJob;
+use Illuminate\Console\Command;
+
+/**
+ * Transfer订单处理命令
+ */
+class TransferProcessCommand extends Command
+{
+    /**
+     * 命令签名
+     */
+    protected $signature = 'transfer:process 
+                            {--type=all : 处理类型 (all|orders|callbacks)}
+                            {--limit=100 : 处理数量限制}
+                            {--queue : 使用队列处理}
+                            {--dry-run : 仅显示将要处理的订单,不实际执行}';
+
+    /**
+     * 命令描述
+     */
+    protected $description = '处理Transfer模块的待处理订单和回调';
+
+    /**
+     * 执行命令
+     */
+    public function handle(): int
+    {
+        $type = $this->option('type');
+        $limit = (int) $this->option('limit');
+        $useQueue = $this->option('queue');
+        $dryRun = $this->option('dry-run');
+
+        $this->info("开始处理Transfer订单 (类型: {$type}, 限制: {$limit})");
+
+        if ($dryRun) {
+            $this->warn('*** 这是一次试运行,不会实际处理订单 ***');
+        }
+
+        $totalProcessed = 0;
+
+        try {
+            switch ($type) {
+                case 'orders':
+                    $totalProcessed = $this->processOrders($limit, $useQueue, $dryRun);
+                    break;
+                case 'callbacks':
+                    $totalProcessed = $this->processCallbacks($limit, $useQueue, $dryRun);
+                    break;
+                case 'all':
+                default:
+                    $ordersProcessed = $this->processOrders($limit, $useQueue, $dryRun);
+                    $callbacksProcessed = $this->processCallbacks($limit - $ordersProcessed, $useQueue, $dryRun);
+                    $totalProcessed = $ordersProcessed + $callbacksProcessed;
+                    break;
+            }
+
+            $this->info("处理完成,共处理 {$totalProcessed} 个项目");
+            return 0;
+
+        } catch (\Exception $e) {
+            $this->error("处理失败: " . $e->getMessage());
+            return 1;
+        }
+    }
+
+    /**
+     * 处理待处理的订单
+     */
+    private function processOrders(int $limit, bool $useQueue, bool $dryRun): int
+    {
+        $this->line('正在查找待处理的订单...');
+
+        // 查找需要处理的订单
+        $orders = TransferOrder::where('status', TransferStatus::PROCESSING)
+            ->where('created_at', '>', now()->subHours(24))
+            ->orderBy('created_at')
+            ->limit($limit)
+            ->get();
+
+        if ($orders->isEmpty()) {
+            $this->info('没有找到待处理的订单');
+            return 0;
+        }
+
+        $this->info("找到 {$orders->count()} 个待处理的订单");
+
+        if ($dryRun) {
+            $this->table(
+                ['ID', '类型', '状态', '应用', '金额', '创建时间'],
+                $orders->map(function ($order) {
+                    return [
+                        $order->id,
+                        $order->type->getDescription(),
+                        $order->status->getDescription(),
+                        $order->transferApp->title,
+                        $order->amount,
+                        $order->created_at->format('Y-m-d H:i:s')
+                    ];
+                })
+            );
+            return $orders->count();
+        }
+
+        $processed = 0;
+        $progressBar = $this->output->createProgressBar($orders->count());
+        $progressBar->start();
+
+        foreach ($orders as $order) {
+            try {
+                if ($useQueue) {
+                    // 使用队列处理
+                    ProcessTransferOrderJob::dispatch($order->id, 'query');
+                    $this->line(" 订单 {$order->id} 已加入队列");
+                } else {
+                    // 直接处理
+                    if ($order->isTransferOut()) {
+                        $success = OrderLogic::queryTransferOutStatus($order);
+                    } else {
+                        $success = OrderLogic::queryTransferInStatus($order);
+                    }
+
+                    if ($success) {
+                        $this->line(" 订单 {$order->id} 处理成功");
+                    } else {
+                        $this->line(" 订单 {$order->id} 处理失败");
+                    }
+                }
+
+                $processed++;
+                $progressBar->advance();
+
+            } catch (\Exception $e) {
+                $this->error(" 订单 {$order->id} 处理异常: " . $e->getMessage());
+            }
+        }
+
+        $progressBar->finish();
+        $this->line('');
+
+        return $processed;
+    }
+
+    /**
+     * 处理待回调的订单
+     */
+    private function processCallbacks(int $limit, bool $useQueue, bool $dryRun): int
+    {
+        if ($limit <= 0) {
+            return 0;
+        }
+
+        $this->line('正在查找待回调的订单...');
+
+        // 查找需要回调的订单
+        $orders = TransferOrder::where('status', TransferStatus::PROCESSING)
+            ->whereHas('transferApp', function ($query) {
+                $query->whereNotNull('order_callback_url');
+            })
+            ->where('created_at', '>', now()->subHours(24))
+            ->where('created_at', '<', now()->subMinutes(5)) // 5分钟前创建的订单
+            ->orderBy('created_at')
+            ->limit($limit)
+            ->get();
+
+        if ($orders->isEmpty()) {
+            $this->info('没有找到待回调的订单');
+            return 0;
+        }
+
+        $this->info("找到 {$orders->count()} 个待回调的订单");
+
+        if ($dryRun) {
+            $this->table(
+                ['ID', '类型', '状态', '回调URL', '创建时间'],
+                $orders->map(function ($order) {
+                    return [
+                        $order->id,
+                        $order->type->getDescription(),
+                        $order->status->getDescription(),
+                        $order->transferApp->order_callback_url,
+                        $order->created_at->format('Y-m-d H:i:s')
+                    ];
+                })
+            );
+            return $orders->count();
+        }
+
+        $processed = 0;
+        $progressBar = $this->output->createProgressBar($orders->count());
+        $progressBar->start();
+
+        foreach ($orders as $order) {
+            try {
+                if ($useQueue) {
+                    // 使用队列处理
+                    SendCallbackJob::dispatch($order->id);
+                    $this->line(" 订单 {$order->id} 回调已加入队列");
+                } else {
+                    // 直接处理
+                    $success = CallbackLogic::sendCallback($order);
+                    
+                    if ($success) {
+                        $this->line(" 订单 {$order->id} 回调发送成功");
+                    } else {
+                        $this->line(" 订单 {$order->id} 回调发送失败");
+                    }
+                }
+
+                $processed++;
+                $progressBar->advance();
+
+            } catch (\Exception $e) {
+                $this->error(" 订单 {$order->id} 回调异常: " . $e->getMessage());
+            }
+        }
+
+        $progressBar->finish();
+        $this->line('');
+
+        return $processed;
+    }
+}

+ 275 - 0
app/Module/Transfer/Commands/TransferStatsCommand.php

@@ -0,0 +1,275 @@
+<?php
+
+namespace App\Module\Transfer\Commands;
+
+use App\Module\Transfer\Models\TransferOrder;
+use App\Module\Transfer\Models\TransferApp;
+use App\Module\Transfer\Enums\TransferStatus;
+use App\Module\Transfer\Enums\TransferType;
+use Illuminate\Console\Command;
+use Illuminate\Support\Facades\DB;
+
+/**
+ * Transfer统计命令
+ */
+class TransferStatsCommand extends Command
+{
+    /**
+     * 命令签名
+     */
+    protected $signature = 'transfer:stats 
+                            {--period=today : 统计周期 (today|yesterday|week|month)}
+                            {--app= : 指定应用ID}
+                            {--export= : 导出到文件}';
+
+    /**
+     * 命令描述
+     */
+    protected $description = '显示Transfer模块的统计信息';
+
+    /**
+     * 执行命令
+     */
+    public function handle(): int
+    {
+        $period = $this->option('period');
+        $appId = $this->option('app');
+        $exportFile = $this->option('export');
+
+        $this->info("Transfer模块统计报告 - {$period}");
+        $this->line(str_repeat('=', 60));
+
+        try {
+            // 获取时间范围
+            [$startDate, $endDate] = $this->getDateRange($period);
+            
+            $this->info("统计时间: {$startDate} 至 {$endDate}");
+            $this->line('');
+
+            // 基础统计
+            $this->showBasicStats($startDate, $endDate, $appId);
+            
+            // 按应用统计
+            $this->showAppStats($startDate, $endDate, $appId);
+            
+            // 按状态统计
+            $this->showStatusStats($startDate, $endDate, $appId);
+            
+            // 按小时统计(仅当日)
+            if ($period === 'today') {
+                $this->showHourlyStats($startDate, $endDate, $appId);
+            }
+
+            // 导出数据
+            if ($exportFile) {
+                $this->exportStats($startDate, $endDate, $appId, $exportFile);
+            }
+
+            return 0;
+
+        } catch (\Exception $e) {
+            $this->error("统计失败: " . $e->getMessage());
+            return 1;
+        }
+    }
+
+    /**
+     * 获取日期范围
+     */
+    private function getDateRange(string $period): array
+    {
+        return match ($period) {
+            'today' => [
+                now()->startOfDay()->toDateTimeString(),
+                now()->endOfDay()->toDateTimeString()
+            ],
+            'yesterday' => [
+                now()->subDay()->startOfDay()->toDateTimeString(),
+                now()->subDay()->endOfDay()->toDateTimeString()
+            ],
+            'week' => [
+                now()->startOfWeek()->toDateTimeString(),
+                now()->endOfWeek()->toDateTimeString()
+            ],
+            'month' => [
+                now()->startOfMonth()->toDateTimeString(),
+                now()->endOfMonth()->toDateTimeString()
+            ],
+            default => throw new \InvalidArgumentException("无效的统计周期: {$period}")
+        };
+    }
+
+    /**
+     * 显示基础统计
+     */
+    private function showBasicStats(string $startDate, string $endDate, ?string $appId): void
+    {
+        $query = TransferOrder::whereBetween('created_at', [$startDate, $endDate]);
+        
+        if ($appId) {
+            $query->where('transfer_app_id', $appId);
+        }
+
+        $totalOrders = $query->count();
+        $totalAmount = $query->sum('amount');
+        
+        $inOrders = (clone $query)->where('type', TransferType::IN)->count();
+        $outOrders = (clone $query)->where('type', TransferType::OUT)->count();
+        
+        $inAmount = (clone $query)->where('type', TransferType::IN)->sum('amount');
+        $outAmount = (clone $query)->where('type', TransferType::OUT)->sum('amount');
+        
+        $completedOrders = (clone $query)->where('status', TransferStatus::COMPLETED)->count();
+        $failedOrders = (clone $query)->where('status', TransferStatus::FAILED)->count();
+
+        $this->info('基础统计:');
+        $this->table(
+            ['指标', '数值'],
+            [
+                ['总订单数', number_format($totalOrders)],
+                ['总金额', number_format($totalAmount, 2)],
+                ['转入订单', number_format($inOrders)],
+                ['转出订单', number_format($outOrders)],
+                ['转入金额', number_format($inAmount, 2)],
+                ['转出金额', number_format($outAmount, 2)],
+                ['成功订单', number_format($completedOrders)],
+                ['失败订单', number_format($failedOrders)],
+                ['成功率', $totalOrders > 0 ? number_format($completedOrders / $totalOrders * 100, 2) . '%' : '0%'],
+            ]
+        );
+        $this->line('');
+    }
+
+    /**
+     * 显示按应用统计
+     */
+    private function showAppStats(string $startDate, string $endDate, ?string $appId): void
+    {
+        $query = DB::table('transfer_orders as o')
+            ->join('transfer_apps as a', 'o.transfer_app_id', '=', 'a.id')
+            ->whereBetween('o.created_at', [$startDate, $endDate])
+            ->select([
+                'a.title as app_name',
+                DB::raw('COUNT(*) as total_orders'),
+                DB::raw('SUM(o.amount) as total_amount'),
+                DB::raw('SUM(CASE WHEN o.type = 1 THEN 1 ELSE 0 END) as in_orders'),
+                DB::raw('SUM(CASE WHEN o.type = 2 THEN 1 ELSE 0 END) as out_orders'),
+                DB::raw('SUM(CASE WHEN o.status = 100 THEN 1 ELSE 0 END) as completed_orders'),
+            ])
+            ->groupBy('a.id', 'a.title')
+            ->orderBy('total_amount', 'desc');
+
+        if ($appId) {
+            $query->where('a.id', $appId);
+        }
+
+        $stats = $query->get();
+
+        if ($stats->isNotEmpty()) {
+            $this->info('按应用统计:');
+            $this->table(
+                ['应用名称', '总订单', '总金额', '转入', '转出', '成功', '成功率'],
+                $stats->map(function ($stat) {
+                    $successRate = $stat->total_orders > 0 
+                        ? number_format($stat->completed_orders / $stat->total_orders * 100, 1) . '%'
+                        : '0%';
+                    
+                    return [
+                        $stat->app_name,
+                        number_format($stat->total_orders),
+                        number_format($stat->total_amount, 2),
+                        number_format($stat->in_orders),
+                        number_format($stat->out_orders),
+                        number_format($stat->completed_orders),
+                        $successRate,
+                    ];
+                })
+            );
+            $this->line('');
+        }
+    }
+
+    /**
+     * 显示按状态统计
+     */
+    private function showStatusStats(string $startDate, string $endDate, ?string $appId): void
+    {
+        $query = TransferOrder::whereBetween('created_at', [$startDate, $endDate])
+            ->select([
+                'status',
+                DB::raw('COUNT(*) as count'),
+                DB::raw('SUM(amount) as amount')
+            ])
+            ->groupBy('status')
+            ->orderBy('status');
+
+        if ($appId) {
+            $query->where('transfer_app_id', $appId);
+        }
+
+        $stats = $query->get();
+
+        if ($stats->isNotEmpty()) {
+            $this->info('按状态统计:');
+            $this->table(
+                ['状态', '订单数', '金额'],
+                $stats->map(function ($stat) {
+                    $status = TransferStatus::from($stat->status);
+                    return [
+                        $status->getDescription(),
+                        number_format($stat->count),
+                        number_format($stat->amount, 2),
+                    ];
+                })
+            );
+            $this->line('');
+        }
+    }
+
+    /**
+     * 显示按小时统计
+     */
+    private function showHourlyStats(string $startDate, string $endDate, ?string $appId): void
+    {
+        $query = TransferOrder::whereBetween('created_at', [$startDate, $endDate])
+            ->select([
+                DB::raw('HOUR(created_at) as hour'),
+                DB::raw('COUNT(*) as count'),
+                DB::raw('SUM(amount) as amount')
+            ])
+            ->groupBy(DB::raw('HOUR(created_at)'))
+            ->orderBy('hour');
+
+        if ($appId) {
+            $query->where('transfer_app_id', $appId);
+        }
+
+        $stats = $query->get();
+
+        if ($stats->isNotEmpty()) {
+            $this->info('按小时统计:');
+            $this->table(
+                ['小时', '订单数', '金额'],
+                $stats->map(function ($stat) {
+                    return [
+                        sprintf('%02d:00', $stat->hour),
+                        number_format($stat->count),
+                        number_format($stat->amount, 2),
+                    ];
+                })
+            );
+            $this->line('');
+        }
+    }
+
+    /**
+     * 导出统计数据
+     */
+    private function exportStats(string $startDate, string $endDate, ?string $appId, string $filename): void
+    {
+        $this->info("正在导出统计数据到 {$filename}...");
+        
+        // 这里可以实现CSV或Excel导出逻辑
+        $this->info('导出功能待实现');
+    }
+}

+ 104 - 0
app/Module/Transfer/Events/TransferOrderCompleted.php

@@ -0,0 +1,104 @@
+<?php
+
+namespace App\Module\Transfer\Events;
+
+use App\Module\Transfer\Models\TransferOrder;
+use App\Module\Transfer\Enums\TransferStatus;
+use Illuminate\Broadcasting\InteractsWithSockets;
+use Illuminate\Foundation\Events\Dispatchable;
+use Illuminate\Queue\SerializesModels;
+
+/**
+ * 划转订单完成事件
+ */
+class TransferOrderCompleted
+{
+    use Dispatchable, InteractsWithSockets, SerializesModels;
+
+    public TransferOrder $order;
+    public TransferStatus $previousStatus;
+    public array $context;
+
+    /**
+     * 创建事件实例
+     */
+    public function __construct(TransferOrder $order, TransferStatus $previousStatus, array $context = [])
+    {
+        $this->order = $order;
+        $this->previousStatus = $previousStatus;
+        $this->context = $context;
+    }
+
+    /**
+     * 获取订单信息
+     */
+    public function getOrder(): TransferOrder
+    {
+        return $this->order;
+    }
+
+    /**
+     * 获取之前的状态
+     */
+    public function getPreviousStatus(): TransferStatus
+    {
+        return $this->previousStatus;
+    }
+
+    /**
+     * 获取上下文信息
+     */
+    public function getContext(): array
+    {
+        return $this->context;
+    }
+
+    /**
+     * 判断是否为转入订单
+     */
+    public function isTransferIn(): bool
+    {
+        return $this->order->isTransferIn();
+    }
+
+    /**
+     * 判断是否为转出订单
+     */
+    public function isTransferOut(): bool
+    {
+        return $this->order->isTransferOut();
+    }
+
+    /**
+     * 获取处理时长(秒)
+     */
+    public function getProcessingDuration(): int
+    {
+        if ($this->order->completed_at && $this->order->created_at) {
+            return $this->order->completed_at->diffInSeconds($this->order->created_at);
+        }
+        
+        return 0;
+    }
+
+    /**
+     * 获取事件数据
+     */
+    public function toArray(): array
+    {
+        return [
+            'order_id' => $this->order->id,
+            'transfer_app_id' => $this->order->transfer_app_id,
+            'user_id' => $this->order->user_id,
+            'type' => $this->order->type->value,
+            'amount' => $this->order->amount,
+            'out_amount' => $this->order->out_amount,
+            'previous_status' => $this->previousStatus->value,
+            'current_status' => $this->order->status->value,
+            'processing_duration' => $this->getProcessingDuration(),
+            'created_at' => $this->order->created_at->toDateTimeString(),
+            'completed_at' => $this->order->completed_at?->toDateTimeString(),
+            'context' => $this->context,
+        ];
+    }
+}

+ 78 - 0
app/Module/Transfer/Events/TransferOrderCreated.php

@@ -0,0 +1,78 @@
+<?php
+
+namespace App\Module\Transfer\Events;
+
+use App\Module\Transfer\Models\TransferOrder;
+use Illuminate\Broadcasting\InteractsWithSockets;
+use Illuminate\Foundation\Events\Dispatchable;
+use Illuminate\Queue\SerializesModels;
+
+/**
+ * 划转订单创建事件
+ */
+class TransferOrderCreated
+{
+    use Dispatchable, InteractsWithSockets, SerializesModels;
+
+    public TransferOrder $order;
+    public array $context;
+
+    /**
+     * 创建事件实例
+     */
+    public function __construct(TransferOrder $order, array $context = [])
+    {
+        $this->order = $order;
+        $this->context = $context;
+    }
+
+    /**
+     * 获取订单信息
+     */
+    public function getOrder(): TransferOrder
+    {
+        return $this->order;
+    }
+
+    /**
+     * 获取上下文信息
+     */
+    public function getContext(): array
+    {
+        return $this->context;
+    }
+
+    /**
+     * 判断是否为转入订单
+     */
+    public function isTransferIn(): bool
+    {
+        return $this->order->isTransferIn();
+    }
+
+    /**
+     * 判断是否为转出订单
+     */
+    public function isTransferOut(): bool
+    {
+        return $this->order->isTransferOut();
+    }
+
+    /**
+     * 获取事件数据
+     */
+    public function toArray(): array
+    {
+        return [
+            'order_id' => $this->order->id,
+            'transfer_app_id' => $this->order->transfer_app_id,
+            'user_id' => $this->order->user_id,
+            'type' => $this->order->type->value,
+            'amount' => $this->order->amount,
+            'out_amount' => $this->order->out_amount,
+            'status' => $this->order->status->value,
+            'created_at' => $this->order->created_at->toDateTimeString(),
+            'context' => $this->context,
+        ];
+    }
+}

+ 260 - 0
app/Module/Transfer/Jobs/ProcessTransferOrderJob.php

@@ -0,0 +1,260 @@
+<?php
+
+namespace App\Module\Transfer\Jobs;
+
+use App\Module\Transfer\Logics\OrderLogic;
+use App\Module\Transfer\Models\TransferOrder;
+use App\Module\Transfer\Enums\TransferStatus;
+use App\Module\Transfer\Enums\TransferType;
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use Illuminate\Support\Facades\Log;
+
+/**
+ * 处理划转订单任务
+ */
+class ProcessTransferOrderJob implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    private int $orderId;
+    private string $action;
+
+    /**
+     * 任务最大尝试次数
+     */
+    public int $tries = 3;
+
+    /**
+     * 任务超时时间(秒)
+     */
+    public int $timeout = 120;
+
+    /**
+     * 创建任务实例
+     */
+    public function __construct(int $orderId, string $action = 'process')
+    {
+        $this->orderId = $orderId;
+        $this->action = $action;
+        
+        // 设置队列名称
+        $this->onQueue('transfer');
+    }
+
+    /**
+     * 执行任务
+     */
+    public function handle(): void
+    {
+        try {
+            $order = TransferOrder::find($this->orderId);
+            
+            if (!$order) {
+                Log::warning('Transfer order not found for processing', [
+                    'order_id' => $this->orderId,
+                    'action' => $this->action
+                ]);
+                return;
+            }
+
+            Log::info('Processing transfer order', [
+                'order_id' => $this->orderId,
+                'action' => $this->action,
+                'type' => $order->type->value,
+                'status' => $order->status->value
+            ]);
+
+            // 根据动作类型执行不同的处理
+            match ($this->action) {
+                'process' => $this->processOrder($order),
+                'query' => $this->queryOrderStatus($order),
+                'retry' => $this->retryOrder($order),
+                default => Log::warning('Unknown action for transfer order', [
+                    'order_id' => $this->orderId,
+                    'action' => $this->action
+                ])
+            };
+
+        } catch (\Exception $e) {
+            Log::error('Transfer order processing failed', [
+                'order_id' => $this->orderId,
+                'action' => $this->action,
+                'error' => $e->getMessage(),
+                'trace' => $e->getTraceAsString()
+            ]);
+
+            // 重新抛出异常以触发重试机制
+            throw $e;
+        }
+    }
+
+    /**
+     * 处理订单
+     */
+    private function processOrder(TransferOrder $order): void
+    {
+        // 检查订单状态
+        if ($order->isFinalStatus()) {
+            Log::info('Transfer order already in final status', [
+                'order_id' => $order->id,
+                'status' => $order->status->value
+            ]);
+            return;
+        }
+
+        // 根据订单类型处理
+        if ($order->isTransferOut()) {
+            $this->processTransferOut($order);
+        } else {
+            $this->processTransferIn($order);
+        }
+    }
+
+    /**
+     * 处理转出订单
+     */
+    private function processTransferOut(TransferOrder $order): void
+    {
+        $success = OrderLogic::processTransferOut($order);
+        
+        if ($success) {
+            Log::info('Transfer out order processed successfully', [
+                'order_id' => $order->id
+            ]);
+            
+            // 如果需要查询状态,延迟调度查询任务
+            if ($order->status === TransferStatus::PROCESSING) {
+                self::dispatch($order->id, 'query')
+                    ->delay(now()->addMinutes(2));
+            }
+        } else {
+            Log::error('Transfer out order processing failed', [
+                'order_id' => $order->id
+            ]);
+        }
+    }
+
+    /**
+     * 处理转入订单
+     */
+    private function processTransferIn(TransferOrder $order): void
+    {
+        // 转入订单主要是发送回调
+        if ($order->transferApp->supportsCallback()) {
+            SendCallbackJob::dispatch($order->id);
+        } else {
+            // 没有回调配置,直接完成
+            $order->updateStatus(TransferStatus::COMPLETED);
+        }
+    }
+
+    /**
+     * 查询订单状态
+     */
+    private function queryOrderStatus(TransferOrder $order): void
+    {
+        if ($order->isTransferOut()) {
+            $success = OrderLogic::queryTransferOutStatus($order);
+        } else {
+            $success = OrderLogic::queryTransferInStatus($order);
+        }
+
+        if ($success) {
+            Log::info('Transfer order status queried successfully', [
+                'order_id' => $order->id,
+                'new_status' => $order->fresh()->status->value
+            ]);
+        } else {
+            Log::warning('Transfer order status query failed', [
+                'order_id' => $order->id
+            ]);
+            
+            // 如果查询失败且订单仍在处理中,安排下次查询
+            if ($order->status === TransferStatus::PROCESSING) {
+                self::dispatch($order->id, 'query')
+                    ->delay(now()->addMinutes(5));
+            }
+        }
+    }
+
+    /**
+     * 重试订单
+     */
+    private function retryOrder(TransferOrder $order): void
+    {
+        if (!$order->canRetry()) {
+            Log::warning('Transfer order cannot be retried', [
+                'order_id' => $order->id,
+                'status' => $order->status->value
+            ]);
+            return;
+        }
+
+        // 重置状态并重新处理
+        $order->updateStatus(TransferStatus::CREATED);
+        $this->processOrder($order);
+    }
+
+    /**
+     * 任务失败时的处理
+     */
+    public function failed(\Throwable $exception): void
+    {
+        Log::error('Transfer order job failed permanently', [
+            'order_id' => $this->orderId,
+            'action' => $this->action,
+            'error' => $exception->getMessage(),
+            'attempts' => $this->attempts()
+        ]);
+
+        // 更新订单状态为失败
+        $order = TransferOrder::find($this->orderId);
+        if ($order && !$order->isFinalStatus()) {
+            $order->updateStatus(TransferStatus::FAILED, '任务处理失败: ' . $exception->getMessage());
+        }
+    }
+
+    /**
+     * 获取任务的唯一ID
+     */
+    public function uniqueId(): string
+    {
+        return "transfer_order_{$this->orderId}_{$this->action}";
+    }
+
+    /**
+     * 静态方法:调度处理订单任务
+     */
+    public static function scheduleProcess(int $orderId, int $delayMinutes = 0): void
+    {
+        $job = new self($orderId, 'process');
+        
+        if ($delayMinutes > 0) {
+            $job->delay(now()->addMinutes($delayMinutes));
+        }
+        
+        dispatch($job);
+    }
+
+    /**
+     * 静态方法:调度查询状态任务
+     */
+    public static function scheduleQuery(int $orderId, int $delayMinutes = 2): void
+    {
+        self::dispatch($orderId, 'query')
+            ->delay(now()->addMinutes($delayMinutes));
+    }
+
+    /**
+     * 静态方法:调度重试任务
+     */
+    public static function scheduleRetry(int $orderId, int $delayMinutes = 1): void
+    {
+        self::dispatch($orderId, 'retry')
+            ->delay(now()->addMinutes($delayMinutes));
+    }
+}

+ 222 - 0
app/Module/Transfer/Jobs/SendCallbackJob.php

@@ -0,0 +1,222 @@
+<?php
+
+namespace App\Module\Transfer\Jobs;
+
+use App\Module\Transfer\Logics\CallbackLogic;
+use App\Module\Transfer\Models\TransferOrder;
+use App\Module\Transfer\Enums\TransferStatus;
+use Illuminate\Bus\Queueable;
+use Illuminate\Contracts\Queue\ShouldQueue;
+use Illuminate\Foundation\Bus\Dispatchable;
+use Illuminate\Queue\InteractsWithQueue;
+use Illuminate\Queue\SerializesModels;
+use Illuminate\Support\Facades\Log;
+
+/**
+ * 发送回调通知任务
+ */
+class SendCallbackJob implements ShouldQueue
+{
+    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
+
+    private int $orderId;
+    private int $retryCount;
+    private int $maxRetries;
+
+    /**
+     * 任务最大尝试次数
+     */
+    public int $tries = 1; // 重试逻辑由业务控制
+
+    /**
+     * 任务超时时间(秒)
+     */
+    public int $timeout = 60;
+
+    /**
+     * 创建任务实例
+     */
+    public function __construct(int $orderId, int $retryCount = 0, int $maxRetries = 3)
+    {
+        $this->orderId = $orderId;
+        $this->retryCount = $retryCount;
+        $this->maxRetries = $maxRetries;
+        
+        // 设置队列名称
+        $this->onQueue('transfer_callback');
+    }
+
+    /**
+     * 执行任务
+     */
+    public function handle(): void
+    {
+        try {
+            $order = TransferOrder::find($this->orderId);
+            
+            if (!$order) {
+                Log::warning('Transfer order not found for callback', [
+                    'order_id' => $this->orderId
+                ]);
+                return;
+            }
+
+            // 检查订单状态
+            if ($order->isFinalStatus()) {
+                Log::info('Transfer order already in final status, skipping callback', [
+                    'order_id' => $order->id,
+                    'status' => $order->status->value
+                ]);
+                return;
+            }
+
+            // 检查是否支持回调
+            if (!$order->transferApp->supportsCallback()) {
+                Log::info('Transfer app does not support callback, completing order', [
+                    'order_id' => $order->id,
+                    'app_id' => $order->transfer_app_id
+                ]);
+                $order->updateStatus(TransferStatus::COMPLETED);
+                return;
+            }
+
+            Log::info('Sending transfer callback', [
+                'order_id' => $order->id,
+                'retry_count' => $this->retryCount,
+                'callback_url' => $order->transferApp->order_callback_url
+            ]);
+
+            // 发送回调
+            $success = CallbackLogic::sendCallback($order);
+
+            if ($success) {
+                Log::info('Transfer callback sent successfully', [
+                    'order_id' => $order->id,
+                    'retry_count' => $this->retryCount
+                ]);
+            } else {
+                $this->handleCallbackFailure($order);
+            }
+
+        } catch (\Exception $e) {
+            Log::error('Transfer callback job failed', [
+                'order_id' => $this->orderId,
+                'retry_count' => $this->retryCount,
+                'error' => $e->getMessage(),
+                'trace' => $e->getTraceAsString()
+            ]);
+
+            // 处理回调失败
+            $order = TransferOrder::find($this->orderId);
+            if ($order) {
+                $this->handleCallbackFailure($order);
+            }
+        }
+    }
+
+    /**
+     * 处理回调失败
+     */
+    private function handleCallbackFailure(TransferOrder $order): void
+    {
+        if ($this->retryCount < $this->maxRetries) {
+            // 安排重试
+            $nextRetryCount = $this->retryCount + 1;
+            $delayMinutes = $this->calculateRetryDelay($nextRetryCount);
+
+            Log::info('Scheduling callback retry', [
+                'order_id' => $order->id,
+                'retry_count' => $nextRetryCount,
+                'delay_minutes' => $delayMinutes
+            ]);
+
+            // 更新订单的重试计数
+            $callbackData = $order->callback_data;
+            $callbackData['retry_count'] = $nextRetryCount;
+            $callbackData['last_retry_at'] = now()->toDateTimeString();
+            $order->update(['callback_data' => $callbackData]);
+
+            // 调度重试任务
+            self::dispatch($order->id, $nextRetryCount, $this->maxRetries)
+                ->delay(now()->addMinutes($delayMinutes));
+
+        } else {
+            // 达到最大重试次数,记录失败
+            Log::error('Transfer callback max retries reached', [
+                'order_id' => $order->id,
+                'retry_count' => $this->retryCount,
+                'max_retries' => $this->maxRetries
+            ]);
+
+            // 更新订单状态为失败
+            $order->updateStatus(TransferStatus::FAILED, '回调发送失败,已达最大重试次数');
+        }
+    }
+
+    /**
+     * 计算重试延迟时间(指数退避)
+     */
+    private function calculateRetryDelay(int $retryCount): int
+    {
+        // 指数退避:1分钟、2分钟、4分钟
+        return min(pow(2, $retryCount - 1), 30); // 最大30分钟
+    }
+
+    /**
+     * 任务失败时的处理
+     */
+    public function failed(\Throwable $exception): void
+    {
+        Log::error('Transfer callback job failed permanently', [
+            'order_id' => $this->orderId,
+            'retry_count' => $this->retryCount,
+            'error' => $exception->getMessage()
+        ]);
+
+        // 更新订单状态为失败
+        $order = TransferOrder::find($this->orderId);
+        if ($order && !$order->isFinalStatus()) {
+            $order->updateStatus(TransferStatus::FAILED, '回调任务失败: ' . $exception->getMessage());
+        }
+    }
+
+    /**
+     * 获取任务的唯一ID
+     */
+    public function uniqueId(): string
+    {
+        return "transfer_callback_{$this->orderId}_{$this->retryCount}";
+    }
+
+    /**
+     * 静态方法:调度回调任务
+     */
+    public static function schedule(int $orderId, int $delayMinutes = 0): void
+    {
+        $job = new self($orderId);
+        
+        if ($delayMinutes > 0) {
+            $job->delay(now()->addMinutes($delayMinutes));
+        }
+        
+        dispatch($job);
+    }
+
+    /**
+     * 静态方法:立即发送回调
+     */
+    public static function sendNow(int $orderId): void
+    {
+        dispatch_sync(new self($orderId));
+    }
+
+    /**
+     * 静态方法:批量调度回调任务
+     */
+    public static function scheduleBatch(array $orderIds, int $delayMinutes = 0): void
+    {
+        foreach ($orderIds as $orderId) {
+            self::schedule($orderId, $delayMinutes);
+        }
+    }
+}