ElasticsearchHandler.php 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. <?php declare(strict_types=1);
  2. /*
  3. * This file is part of the Monolog package.
  4. *
  5. * (c) Jordi Boggiano <j.boggiano@seld.be>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Monolog\Handler;
  11. use Elastic\Elasticsearch\Response\Elasticsearch;
  12. use Throwable;
  13. use RuntimeException;
  14. use Monolog\Level;
  15. use Monolog\Formatter\FormatterInterface;
  16. use Monolog\Formatter\ElasticsearchFormatter;
  17. use InvalidArgumentException;
  18. use Elasticsearch\Common\Exceptions\RuntimeException as ElasticsearchRuntimeException;
  19. use Elasticsearch\Client;
  20. use Monolog\LogRecord;
  21. use Elastic\Elasticsearch\Exception\InvalidArgumentException as ElasticInvalidArgumentException;
  22. use Elastic\Elasticsearch\Client as Client8;
  23. /**
  24. * Elasticsearch handler
  25. *
  26. * @link https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/index.html
  27. *
  28. * Simple usage example:
  29. *
  30. * $client = \Elasticsearch\ClientBuilder::create()
  31. * ->setHosts($hosts)
  32. * ->build();
  33. *
  34. * $options = array(
  35. * 'index' => 'elastic_index_name',
  36. * 'type' => 'elastic_doc_type',
  37. * );
  38. * $handler = new ElasticsearchHandler($client, $options);
  39. * $log = new Logger('application');
  40. * $log->pushHandler($handler);
  41. *
  42. * @author Avtandil Kikabidze <akalongman@gmail.com>
  43. * @phpstan-type Options array{
  44. * index: string,
  45. * type: string,
  46. * ignore_error: bool,
  47. * op_type: 'index'|'create'
  48. * }
  49. * @phpstan-type InputOptions array{
  50. * index?: string,
  51. * type?: string,
  52. * ignore_error?: bool,
  53. * op_type?: 'index'|'create'
  54. * }
  55. */
  56. class ElasticsearchHandler extends AbstractProcessingHandler
  57. {
  58. protected Client|Client8 $client;
  59. /**
  60. * @var mixed[] Handler config options
  61. * @phpstan-var Options
  62. */
  63. protected array $options;
  64. /**
  65. * @var bool
  66. */
  67. private $needsType;
  68. /**
  69. * @param Client|Client8 $client Elasticsearch Client object
  70. * @param mixed[] $options Handler configuration
  71. *
  72. * @phpstan-param InputOptions $options
  73. */
  74. public function __construct(Client|Client8 $client, array $options = [], int|string|Level $level = Level::Debug, bool $bubble = true)
  75. {
  76. parent::__construct($level, $bubble);
  77. $this->client = $client;
  78. $this->options = array_merge(
  79. [
  80. 'index' => 'monolog', // Elastic index name
  81. 'type' => '_doc', // Elastic document type
  82. 'ignore_error' => false, // Suppress Elasticsearch exceptions
  83. 'op_type' => 'index', // Elastic op_type (index or create) (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-op_type)
  84. ],
  85. $options
  86. );
  87. if ($client instanceof Client8 || $client::VERSION[0] === '7') {
  88. $this->needsType = false;
  89. // force the type to _doc for ES8/ES7
  90. $this->options['type'] = '_doc';
  91. } else {
  92. $this->needsType = true;
  93. }
  94. }
  95. /**
  96. * @inheritDoc
  97. */
  98. protected function write(LogRecord $record): void
  99. {
  100. $this->bulkSend([$record->formatted]);
  101. }
  102. /**
  103. * @inheritDoc
  104. */
  105. public function setFormatter(FormatterInterface $formatter): HandlerInterface
  106. {
  107. if ($formatter instanceof ElasticsearchFormatter) {
  108. return parent::setFormatter($formatter);
  109. }
  110. throw new InvalidArgumentException('ElasticsearchHandler is only compatible with ElasticsearchFormatter');
  111. }
  112. /**
  113. * Getter options
  114. *
  115. * @return mixed[]
  116. *
  117. * @phpstan-return Options
  118. */
  119. public function getOptions(): array
  120. {
  121. return $this->options;
  122. }
  123. /**
  124. * @inheritDoc
  125. */
  126. protected function getDefaultFormatter(): FormatterInterface
  127. {
  128. return new ElasticsearchFormatter($this->options['index'], $this->options['type']);
  129. }
  130. /**
  131. * @inheritDoc
  132. */
  133. public function handleBatch(array $records): void
  134. {
  135. $documents = $this->getFormatter()->formatBatch($records);
  136. $this->bulkSend($documents);
  137. }
  138. /**
  139. * Use Elasticsearch bulk API to send list of documents
  140. *
  141. * @param array<array<mixed>> $records Records + _index/_type keys
  142. * @throws \RuntimeException
  143. */
  144. protected function bulkSend(array $records): void
  145. {
  146. try {
  147. $params = [
  148. 'body' => [],
  149. ];
  150. foreach ($records as $record) {
  151. $params['body'][] = [
  152. $this->options['op_type'] => $this->needsType ? [
  153. '_index' => $record['_index'],
  154. '_type' => $record['_type'],
  155. ] : [
  156. '_index' => $record['_index'],
  157. ],
  158. ];
  159. unset($record['_index'], $record['_type']);
  160. $params['body'][] = $record;
  161. }
  162. /** @var Elasticsearch */
  163. $responses = $this->client->bulk($params);
  164. if ($responses['errors'] === true) {
  165. throw $this->createExceptionFromResponses($responses);
  166. }
  167. } catch (Throwable $e) {
  168. if (! $this->options['ignore_error']) {
  169. throw new RuntimeException('Error sending messages to Elasticsearch', 0, $e);
  170. }
  171. }
  172. }
  173. /**
  174. * Creates elasticsearch exception from responses array
  175. *
  176. * Only the first error is converted into an exception.
  177. *
  178. * @param mixed[]|Elasticsearch $responses returned by $this->client->bulk()
  179. */
  180. protected function createExceptionFromResponses($responses): Throwable
  181. {
  182. foreach ($responses['items'] ?? [] as $item) {
  183. if (isset($item['index']['error'])) {
  184. return $this->createExceptionFromError($item['index']['error']);
  185. }
  186. }
  187. if (class_exists(ElasticInvalidArgumentException::class)) {
  188. return new ElasticInvalidArgumentException('Elasticsearch failed to index one or more records.');
  189. }
  190. return new ElasticsearchRuntimeException('Elasticsearch failed to index one or more records.');
  191. }
  192. /**
  193. * Creates elasticsearch exception from error array
  194. *
  195. * @param mixed[] $error
  196. */
  197. protected function createExceptionFromError(array $error): Throwable
  198. {
  199. $previous = isset($error['caused_by']) ? $this->createExceptionFromError($error['caused_by']) : null;
  200. if (class_exists(ElasticInvalidArgumentException::class)) {
  201. return new ElasticInvalidArgumentException($error['type'] . ': ' . $error['reason'], 0, $previous);
  202. }
  203. return new ElasticsearchRuntimeException($error['type'] . ': ' . $error['reason'], 0, $previous);
  204. }
  205. }