Redis.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. <?php
  2. namespace App\Module\DelayQueue;
  3. use App\Module\DelayQueue\Entity\Queue;
  4. use App\Module\DelayQueue\Job\DelayQueueJob;
  5. use App\Module\DelayQueue\Job\Job;
  6. class Redis
  7. {
  8. const E_KEY = 'delay_queue';
  9. const E_KEY2 = 'delay_queue2';
  10. /**
  11. * 添加队列
  12. *
  13. * @param array $callback
  14. * @param $runParam
  15. * @param int $delay
  16. * @return int
  17. */
  18. static public function addQueue($callback, $runParam, $delay = 3): int
  19. {
  20. if (!is_callable($callback)) {
  21. throw new \Exception('callback is not callable');
  22. }
  23. $key = self::E_KEY . $callback[0] . $callback[1].md5(serialize($runParam));
  24. /**
  25. * @var \Redis $a
  26. */
  27. $a = \Illuminate\Support\Facades\Redis::client();
  28. if ($a->exists($key)) {
  29. return 0;
  30. }
  31. $delay= max(2, $delay);
  32. $a->setex($key, max(1,$delay-1), 1);
  33. $q = new Queue();
  34. $q->create_ts = time();
  35. $q->delay_ts = $delay;
  36. $q->runClass = $callback[0];
  37. $q->runMethod = $callback[1];
  38. $q->runParam = $runParam;
  39. Job::dispatch($q)->delay($delay);
  40. return 1;
  41. }
  42. /**
  43. * 增强版本
  44. * @param $callback
  45. * @param $runParam
  46. * @param $delay
  47. * @return int
  48. * @throws \Exception
  49. */
  50. static public function addQueue2($callback, $runParam, $delay = 3): int
  51. {
  52. if (!is_callable($callback)) {
  53. throw new \Exception('callback is not callable');
  54. }
  55. $key =self::getkey($callback,$runParam);
  56. /**
  57. * @var \Redis $a
  58. */
  59. $a = \Illuminate\Support\Facades\Redis::client();
  60. if ($a->exists($key)) {
  61. return 0;
  62. }
  63. $delay= max(2, $delay);
  64. $a->setex($key, max(100,$delay*3), 1);
  65. $q = new Queue();
  66. $q->create_ts = time();
  67. $q->delay_ts = $delay;
  68. $q->runClass = $callback[0];
  69. $q->runMethod = $callback[1];
  70. $q->runParam = $runParam;
  71. DelayQueueJob::dispatch($q)->delay($delay);
  72. // Job::dispatch($q)->delay($delay);
  73. return 1;
  74. }
  75. /**
  76. * 获取缓存Key
  77. * @param $callback
  78. * @param $runParam
  79. * @return string
  80. */
  81. public static function getkey($callback,$runParam)
  82. {
  83. return self::E_KEY2 . $callback[0] . $callback[1].md5(serialize($runParam));
  84. }
  85. }