AmqpHandler.php 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. <?php declare(strict_types=1);
  2. /*
  3. * This file is part of the Monolog package.
  4. *
  5. * (c) Jordi Boggiano <j.boggiano@seld.be>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Monolog\Handler;
  11. use Monolog\Logger;
  12. use Monolog\Formatter\FormatterInterface;
  13. use Monolog\Formatter\JsonFormatter;
  14. use PhpAmqpLib\Message\AMQPMessage;
  15. use PhpAmqpLib\Channel\AMQPChannel;
  16. use AMQPExchange;
  17. class AmqpHandler extends AbstractProcessingHandler
  18. {
  19. /**
  20. * @var AMQPExchange|AMQPChannel $exchange
  21. */
  22. protected $exchange;
  23. /**
  24. * @var string
  25. */
  26. protected $exchangeName;
  27. /**
  28. * @param AMQPExchange|AMQPChannel $exchange AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
  29. * @param string|null $exchangeName Optional exchange name, for AMQPChannel (PhpAmqpLib) only
  30. * @param string|int $level The minimum logging level at which this handler will be triggered
  31. * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
  32. */
  33. public function __construct($exchange, ?string $exchangeName = null, $level = Logger::DEBUG, bool $bubble = true)
  34. {
  35. if ($exchange instanceof AMQPChannel) {
  36. $this->exchangeName = (string) $exchangeName;
  37. } elseif (!$exchange instanceof AMQPExchange) {
  38. throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
  39. } elseif ($exchangeName) {
  40. @trigger_error('The $exchangeName parameter can only be passed when using PhpAmqpLib, if using an AMQPExchange instance configure it beforehand', E_USER_DEPRECATED);
  41. }
  42. $this->exchange = $exchange;
  43. parent::__construct($level, $bubble);
  44. }
  45. /**
  46. * {@inheritDoc}
  47. */
  48. protected function write(array $record): void
  49. {
  50. $data = $record["formatted"];
  51. $routingKey = $this->getRoutingKey($record);
  52. if ($this->exchange instanceof AMQPExchange) {
  53. $this->exchange->publish(
  54. $data,
  55. $routingKey,
  56. 0,
  57. [
  58. 'delivery_mode' => 2,
  59. 'content_type' => 'application/json',
  60. ]
  61. );
  62. } else {
  63. $this->exchange->basic_publish(
  64. $this->createAmqpMessage($data),
  65. $this->exchangeName,
  66. $routingKey
  67. );
  68. }
  69. }
  70. /**
  71. * {@inheritDoc}
  72. */
  73. public function handleBatch(array $records): void
  74. {
  75. if ($this->exchange instanceof AMQPExchange) {
  76. parent::handleBatch($records);
  77. return;
  78. }
  79. foreach ($records as $record) {
  80. if (!$this->isHandling($record)) {
  81. continue;
  82. }
  83. $record = $this->processRecord($record);
  84. $data = $this->getFormatter()->format($record);
  85. $this->exchange->batch_basic_publish(
  86. $this->createAmqpMessage($data),
  87. $this->exchangeName,
  88. $this->getRoutingKey($record)
  89. );
  90. }
  91. $this->exchange->publish_batch();
  92. }
  93. /**
  94. * Gets the routing key for the AMQP exchange
  95. */
  96. protected function getRoutingKey(array $record): string
  97. {
  98. $routingKey = sprintf('%s.%s', $record['level_name'], $record['channel']);
  99. return strtolower($routingKey);
  100. }
  101. private function createAmqpMessage(string $data): AMQPMessage
  102. {
  103. return new AMQPMessage(
  104. $data,
  105. [
  106. 'delivery_mode' => 2,
  107. 'content_type' => 'application/json',
  108. ]
  109. );
  110. }
  111. /**
  112. * {@inheritDoc}
  113. */
  114. protected function getDefaultFormatter(): FormatterInterface
  115. {
  116. return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
  117. }
  118. }