ElasticsearchHandler.php 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. <?php declare(strict_types=1);
  2. /*
  3. * This file is part of the Monolog package.
  4. *
  5. * (c) Jordi Boggiano <j.boggiano@seld.be>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Monolog\Handler;
  11. use Throwable;
  12. use RuntimeException;
  13. use Monolog\Logger;
  14. use Monolog\Formatter\FormatterInterface;
  15. use Monolog\Formatter\ElasticsearchFormatter;
  16. use InvalidArgumentException;
  17. use Elasticsearch\Common\Exceptions\RuntimeException as ElasticsearchRuntimeException;
  18. use Elasticsearch\Client;
  19. /**
  20. * Elasticsearch handler
  21. *
  22. * @link https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html
  23. *
  24. * Simple usage example:
  25. *
  26. * $client = \Elasticsearch\ClientBuilder::create()
  27. * ->setHosts($hosts)
  28. * ->build();
  29. *
  30. * $options = array(
  31. * 'index' => 'elastic_index_name',
  32. * 'type' => 'elastic_doc_type',
  33. * );
  34. * $handler = new ElasticsearchHandler($client, $options);
  35. * $log = new Logger('application');
  36. * $log->pushHandler($handler);
  37. *
  38. * @author Avtandil Kikabidze <akalongman@gmail.com>
  39. */
  40. class ElasticsearchHandler extends AbstractProcessingHandler
  41. {
  42. /**
  43. * @var Client
  44. */
  45. protected $client;
  46. /**
  47. * @var array Handler config options
  48. */
  49. protected $options = [];
  50. /**
  51. * @param Client $client Elasticsearch Client object
  52. * @param array $options Handler configuration
  53. * @param string|int $level The minimum logging level at which this handler will be triggered
  54. * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
  55. */
  56. public function __construct(Client $client, array $options = [], $level = Logger::DEBUG, bool $bubble = true)
  57. {
  58. parent::__construct($level, $bubble);
  59. $this->client = $client;
  60. $this->options = array_merge(
  61. [
  62. 'index' => 'monolog', // Elastic index name
  63. 'type' => '_doc', // Elastic document type
  64. 'ignore_error' => false, // Suppress Elasticsearch exceptions
  65. ],
  66. $options
  67. );
  68. }
  69. /**
  70. * {@inheritDoc}
  71. */
  72. protected function write(array $record): void
  73. {
  74. $this->bulkSend([$record['formatted']]);
  75. }
  76. /**
  77. * {@inheritdoc}
  78. */
  79. public function setFormatter(FormatterInterface $formatter): HandlerInterface
  80. {
  81. if ($formatter instanceof ElasticsearchFormatter) {
  82. return parent::setFormatter($formatter);
  83. }
  84. throw new InvalidArgumentException('ElasticsearchHandler is only compatible with ElasticsearchFormatter');
  85. }
  86. /**
  87. * Getter options
  88. *
  89. * @return array
  90. */
  91. public function getOptions(): array
  92. {
  93. return $this->options;
  94. }
  95. /**
  96. * {@inheritDoc}
  97. */
  98. protected function getDefaultFormatter(): FormatterInterface
  99. {
  100. return new ElasticsearchFormatter($this->options['index'], $this->options['type']);
  101. }
  102. /**
  103. * {@inheritdoc}
  104. */
  105. public function handleBatch(array $records): void
  106. {
  107. $documents = $this->getFormatter()->formatBatch($records);
  108. $this->bulkSend($documents);
  109. }
  110. /**
  111. * Use Elasticsearch bulk API to send list of documents
  112. *
  113. * @param array $records
  114. * @throws \RuntimeException
  115. */
  116. protected function bulkSend(array $records): void
  117. {
  118. try {
  119. $params = [
  120. 'body' => [],
  121. ];
  122. foreach ($records as $record) {
  123. $params['body'][] = [
  124. 'index' => [
  125. '_index' => $record['_index'],
  126. '_type' => $record['_type'],
  127. ],
  128. ];
  129. unset($record['_index'], $record['_type']);
  130. $params['body'][] = $record;
  131. }
  132. $responses = $this->client->bulk($params);
  133. if ($responses['errors'] === true) {
  134. throw $this->createExceptionFromResponses($responses);
  135. }
  136. } catch (Throwable $e) {
  137. if (! $this->options['ignore_error']) {
  138. throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
  139. }
  140. }
  141. }
  142. /**
  143. * Creates elasticsearch exception from responses array
  144. *
  145. * Only the first error is converted into an exception.
  146. *
  147. * @param array $responses returned by $this->client->bulk()
  148. */
  149. protected function createExceptionFromResponses(array $responses): ElasticsearchRuntimeException
  150. {
  151. foreach ($responses['items'] ?? [] as $item) {
  152. if (isset($item['index']['error'])) {
  153. return $this->createExceptionFromError($item['index']['error']);
  154. }
  155. }
  156. return new ElasticsearchRuntimeException('Elasticsearch failed to index one or more records.');
  157. }
  158. /**
  159. * Creates elasticsearch exception from error array
  160. *
  161. * @param array $error
  162. */
  163. protected function createExceptionFromError(array $error): ElasticsearchRuntimeException
  164. {
  165. $previous = isset($error['caused_by']) ? $this->createExceptionFromError($error['caused_by']) : null;
  166. return new ElasticsearchRuntimeException($error['type'] . ': ' . $error['reason'], 0, $previous);
  167. }
  168. }