Redis.php 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. <?php
  2. namespace App\Module\DelayQueue;
  3. use App\Module\DelayQueue\Entity\Queue;
  4. use App\Module\DelayQueue\Job\DelayArrayJob;
  5. use App\Module\DelayQueue\Job\DelayQueueJob;
  6. use App\Module\DelayQueue\Job\Job;
  7. class Redis
  8. {
  9. const E_KEY = 'delay_queue';
  10. const E_KEY2 = 'delay_queue2';
  11. /**
  12. * 添加队列
  13. *
  14. * @param array $callback
  15. * @param $runParam
  16. * @param int $delay
  17. * @return int
  18. * @deprecated
  19. */
  20. static public function addQueue($callback, $runParam, $delay = 3): int
  21. {
  22. if (!is_callable($callback)) {
  23. throw new \Exception('callback is not callable');
  24. }
  25. $key = self::E_KEY . $callback[0] . $callback[1] . md5(serialize($runParam));
  26. /**
  27. * @var \Redis $a
  28. */
  29. $a = \Illuminate\Support\Facades\Redis::client();
  30. if ($a->exists($key)) {
  31. return 0;
  32. }
  33. $delay = max(2, $delay);
  34. $a->setex($key, max(1, $delay - 1), 1);
  35. $q = new Queue();
  36. $q->create_ts = time();
  37. $q->delay_ts = $delay;
  38. $q->runClass = $callback[0];
  39. $q->runMethod = $callback[1];
  40. $q->runParam = $runParam;
  41. Job::dispatch($q)->delay($delay);
  42. return 1;
  43. }
  44. /**
  45. * 增强版本
  46. *
  47. * @param $callback
  48. * @param $runParam
  49. * @param $delay
  50. * @return int
  51. * @throws \Exception
  52. */
  53. static public function addQueue2($callback, $runParam, $delay = 3): int
  54. {
  55. if (!is_callable($callback)) {
  56. throw new \Exception('callback is not callable');
  57. }
  58. $key = self::getkey($callback, $runParam);
  59. /**
  60. * @var \Redis $a
  61. */
  62. $a = \Illuminate\Support\Facades\Redis::client();
  63. if ($a->exists($key)) {
  64. return 0;
  65. }
  66. $delay = max(2, $delay);
  67. $a->setex($key, max(100, $delay * 3), 1);
  68. $q = new Queue();
  69. $q->create_ts = time();
  70. $q->delay_ts = $delay;
  71. $q->runClass = $callback[0];
  72. $q->runMethod = $callback[1];
  73. $q->runParam = $runParam;
  74. DelayQueueJob::dispatch($q)->delay($delay);
  75. // Job::dispatch($q)->delay($delay);
  76. return 1;
  77. }
  78. /**
  79. * 获取缓存Key
  80. *
  81. * @param $callback
  82. * @param $runParam
  83. * @return string
  84. */
  85. public static function getkey($callback, $runParam)
  86. {
  87. return self::E_KEY2 . $callback[0] . $callback[1] . md5(serialize($runParam));
  88. }
  89. /**
  90. * 增强版本-Array参数
  91. *
  92. * @param $callback
  93. * @param $runParam
  94. * @param $delay
  95. * @return int
  96. * @throws \Exception
  97. */
  98. static public function addQueueArr($callback, $runParam, $delay = 3): int
  99. {
  100. if (!is_callable($callback)) {
  101. throw new \Exception('callback is not callable');
  102. }
  103. $key = self::getkey($callback, $runParam);
  104. /**
  105. * @var \Redis $a
  106. */
  107. $a = \Illuminate\Support\Facades\Redis::client();
  108. if ($a->exists($key)) {
  109. return 0;
  110. }
  111. $delay = max(2, $delay);
  112. $a->setex($key, max(100, $delay * 3), 1);
  113. $q = new Queue();
  114. $q->create_ts = time();
  115. $q->delay_ts = $delay;
  116. $q->runClass = $callback[0];
  117. $q->runMethod = $callback[1];
  118. $q->runParam = $runParam;
  119. DelayArrayJob::dispatch($q)->delay($delay);
  120. // Job::dispatch($q)->delay($delay);
  121. return 1;
  122. }
  123. }