ElasticsearchHandler.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. <?php declare(strict_types=1);
  2. /*
  3. * This file is part of the Monolog package.
  4. *
  5. * (c) Jordi Boggiano <j.boggiano@seld.be>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Monolog\Handler;
  11. use Monolog\LevelName;
  12. use Elastic\Elasticsearch\Response\Elasticsearch;
  13. use Throwable;
  14. use RuntimeException;
  15. use Monolog\Level;
  16. use Monolog\Formatter\FormatterInterface;
  17. use Monolog\Formatter\ElasticsearchFormatter;
  18. use InvalidArgumentException;
  19. use Elasticsearch\Common\Exceptions\RuntimeException as ElasticsearchRuntimeException;
  20. use Elasticsearch\Client;
  21. use Monolog\LogRecord;
  22. use Elastic\Elasticsearch\Exception\InvalidArgumentException as ElasticInvalidArgumentException;
  23. use Elastic\Elasticsearch\Client as Client8;
  24. /**
  25. * Elasticsearch handler
  26. *
  27. * @link https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html
  28. *
  29. * Simple usage example:
  30. *
  31. * $client = \Elasticsearch\ClientBuilder::create()
  32. * ->setHosts($hosts)
  33. * ->build();
  34. *
  35. * $options = array(
  36. * 'index' => 'elastic_index_name',
  37. * 'type' => 'elastic_doc_type',
  38. * );
  39. * $handler = new ElasticsearchHandler($client, $options);
  40. * $log = new Logger('application');
  41. * $log->pushHandler($handler);
  42. *
  43. * @author Avtandil Kikabidze <akalongman@gmail.com>
  44. * @phpstan-type Options array{
  45. * index: string,
  46. * type: string,
  47. * ignore_error: bool
  48. * }
  49. * @phpstan-type InputOptions array{
  50. * index?: string,
  51. * type?: string,
  52. * ignore_error?: bool
  53. * }
  54. */
  55. class ElasticsearchHandler extends AbstractProcessingHandler
  56. {
  57. protected Client|Client8 $client;
  58. /**
  59. * @var mixed[] Handler config options
  60. * @phpstan-var Options
  61. */
  62. protected array $options;
  63. /**
  64. * @var bool
  65. */
  66. private $needsType;
  67. /**
  68. * @param Client|Client8 $client Elasticsearch Client object
  69. * @param mixed[] $options Handler configuration
  70. *
  71. * @phpstan-param InputOptions $options
  72. */
  73. public function __construct(Client|Client8 $client, array $options = [], int|string|Level|LevelName $level = Level::Debug, bool $bubble = true)
  74. {
  75. parent::__construct($level, $bubble);
  76. $this->client = $client;
  77. $this->options = array_merge(
  78. [
  79. 'index' => 'monolog', // Elastic index name
  80. 'type' => '_doc', // Elastic document type
  81. 'ignore_error' => false, // Suppress Elasticsearch exceptions
  82. ],
  83. $options
  84. );
  85. if ($client instanceof Client8 || $client::VERSION[0] === '7') {
  86. $this->needsType = false;
  87. // force the type to _doc for ES8/ES7
  88. $this->options['type'] = '_doc';
  89. } else {
  90. $this->needsType = true;
  91. }
  92. }
  93. /**
  94. * @inheritDoc
  95. */
  96. protected function write(LogRecord $record): void
  97. {
  98. $this->bulkSend([$record->formatted]);
  99. }
  100. /**
  101. * @inheritDoc
  102. */
  103. public function setFormatter(FormatterInterface $formatter): HandlerInterface
  104. {
  105. if ($formatter instanceof ElasticsearchFormatter) {
  106. return parent::setFormatter($formatter);
  107. }
  108. throw new InvalidArgumentException('ElasticsearchHandler is only compatible with ElasticsearchFormatter');
  109. }
  110. /**
  111. * Getter options
  112. *
  113. * @return mixed[]
  114. *
  115. * @phpstan-return Options
  116. */
  117. public function getOptions(): array
  118. {
  119. return $this->options;
  120. }
  121. /**
  122. * @inheritDoc
  123. */
  124. protected function getDefaultFormatter(): FormatterInterface
  125. {
  126. return new ElasticsearchFormatter($this->options['index'], $this->options['type']);
  127. }
  128. /**
  129. * @inheritDoc
  130. */
  131. public function handleBatch(array $records): void
  132. {
  133. $documents = $this->getFormatter()->formatBatch($records);
  134. $this->bulkSend($documents);
  135. }
  136. /**
  137. * Use Elasticsearch bulk API to send list of documents
  138. *
  139. * @param array<array<mixed>> $records Records + _index/_type keys
  140. * @throws \RuntimeException
  141. */
  142. protected function bulkSend(array $records): void
  143. {
  144. try {
  145. $params = [
  146. 'body' => [],
  147. ];
  148. foreach ($records as $record) {
  149. $params['body'][] = [
  150. 'index' => $this->needsType ? [
  151. '_index' => $record['_index'],
  152. '_type' => $record['_type'],
  153. ] : [
  154. '_index' => $record['_index'],
  155. ],
  156. ];
  157. unset($record['_index'], $record['_type']);
  158. $params['body'][] = $record;
  159. }
  160. /** @var Elasticsearch */
  161. $responses = $this->client->bulk($params);
  162. if ($responses['errors'] === true) {
  163. throw $this->createExceptionFromResponses($responses);
  164. }
  165. } catch (Throwable $e) {
  166. if (! $this->options['ignore_error']) {
  167. throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
  168. }
  169. }
  170. }
  171. /**
  172. * Creates elasticsearch exception from responses array
  173. *
  174. * Only the first error is converted into an exception.
  175. *
  176. * @param mixed[]|Elasticsearch $responses returned by $this->client->bulk()
  177. */
  178. protected function createExceptionFromResponses($responses): Throwable
  179. {
  180. foreach ($responses['items'] ?? [] as $item) {
  181. if (isset($item['index']['error'])) {
  182. return $this->createExceptionFromError($item['index']['error']);
  183. }
  184. }
  185. if (class_exists(ElasticInvalidArgumentException::class)) {
  186. return new ElasticInvalidArgumentException('Elasticsearch failed to index one or more records.');
  187. }
  188. return new ElasticsearchRuntimeException('Elasticsearch failed to index one or more records.');
  189. }
  190. /**
  191. * Creates elasticsearch exception from error array
  192. *
  193. * @param mixed[] $error
  194. */
  195. protected function createExceptionFromError(array $error): Throwable
  196. {
  197. $previous = isset($error['caused_by']) ? $this->createExceptionFromError($error['caused_by']) : null;
  198. if (class_exists(ElasticInvalidArgumentException::class)) {
  199. return new ElasticInvalidArgumentException($error['type'] . ': ' . $error['reason'], 0, $previous);
  200. }
  201. return new ElasticsearchRuntimeException($error['type'] . ': ' . $error['reason'], 0, $previous);
  202. }
  203. }