Explorar o código

Merge pull request #251 from skymeyer/elastic_search2

ElasticSearchHandler and ElasticaFormatter
Jordi Boggiano %!s(int64=12) %!d(string=hai) anos
pai
achega
eea4d089e7

+ 2 - 0
README.mdown

@@ -148,6 +148,7 @@ Handlers
   [Mongo](http://pecl.php.net/package/mongo) extension connection.
 - _CouchDBHandler_: Logs records to a CouchDB server.
 - _DoctrineCouchDBHandler_: Logs records to a CouchDB server via the Doctrine CouchDB ODM.
+- _ElasticSearchHandler_: Logs records to an Elastic Search server.
 
 ### Wrappers / Special Handlers
 
@@ -181,6 +182,7 @@ Formatters
 - _ChromePHPFormatter_: Used to format log records into the ChromePHP format, only useful for the ChromePHPHandler.
 - _GelfFormatter_: Used to format log records into Gelf message instances, only useful for the GelfHandler.
 - _LogstashFormatter_: Used to format log records into [logstash](http://logstash.net/) event json, useful for any handler listed under inputs [here](http://logstash.net/docs/1.1.5/).
+- _ElasticaFormatter_: Used to format log records into an Elastica\Document object, only useful for the ElasticSearchHandler.
 
 Processors
 ----------

+ 2 - 0
composer.json

@@ -20,12 +20,14 @@
         "phpunit/phpunit": "~3.7.0",
         "mlehner/gelf-php": "1.0.*",
         "raven/raven": "0.5.*",
+        "ruflin/elastica": "0.90.*",
         "doctrine/couchdb": "dev-master"
     },
     "suggest": {
         "mlehner/gelf-php": "Allow sending log messages to a GrayLog2 server",
         "raven/raven": "Allow sending log messages to a Sentry server",
         "doctrine/couchdb": "Allow sending log messages to a CouchDB server",
+        "ruflin/elastica": "Allow sending log messages to an Elastic Search server",
         "ext-amqp": "Allow sending log messages to an AMQP server (1.0+ required)",
         "ext-mongo": "Allow sending log messages to a MongoDB server"
     },

+ 85 - 0
src/Monolog/Formatter/ElasticaFormatter.php

@@ -0,0 +1,85 @@
+<?php
+
+/*
+ * This file is part of the Monolog package.
+ *
+ * (c) Jordi Boggiano <j.boggiano@seld.be>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Monolog\Formatter;
+
+use Elastica\Document;
+
+/**
+ * Format a log message into an Elastica Document
+ *
+ * @author Jelle Vink <jelle.vink@gmail.com>
+ */
+class ElasticaFormatter extends NormalizerFormatter
+{
+    /**
+     * @var string Elastic search index name
+     */
+    protected $index;
+
+    /**
+     * @var string Elastic search document type
+     */
+    protected $type;
+
+    /**
+     * @param string $index Elastic Search index name
+     * @param string $type  Elastic Search document type
+     */
+    public function __construct($index, $type)
+    {
+        parent::__construct(\DateTime::ISO8601);
+        $this->index = $index;
+        $this->type = $type;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function format(array $record)
+    {
+        $record = parent::format($record);
+        return $this->getDocument($record);
+    }
+
+    /**
+     * Getter index
+     * @return string
+     */
+    public function getIndex()
+    {
+        return $this->index;
+    }
+
+    /**
+     * Getter type
+     * @return string
+     */
+    public function getType()
+    {
+        return $this->type;
+    }
+
+    /**
+     * Convert a log message into an Elastica Document
+     *
+     * @param array  $record Log message
+     * @return Document
+     */
+    protected function getDocument($record)
+    {
+        $document = new Document();
+        $document->setData($record);
+        $document->setType($this->type);
+        $document->setIndex($this->index);
+        return $document;
+    }
+}

+ 128 - 0
src/Monolog/Handler/ElasticSearchHandler.php

@@ -0,0 +1,128 @@
+<?php
+
+/*
+ * This file is part of the Monolog package.
+ *
+ * (c) Jordi Boggiano <j.boggiano@seld.be>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Monolog\Handler;
+
+use Monolog\Formatter\FormatterInterface;
+use Monolog\Formatter\ElasticaFormatter;
+use Monolog\Logger;
+use Elastica\Client;
+use Elastica\Exception\ExceptionInterface;
+
+/**
+ * Elastic Search handler
+ *
+ * Usage example:
+ *
+ *    $client = new \Elastica\Client();
+ *    $options = array(
+ *        'index' => 'elastic_index_name',
+ *        'type' => 'elastic_doc_type',
+ *    );
+ *    $handler = new ElasticSearchHandler($client, $options);
+ *    $log = new Logger('application');
+ *    $log->pushHandler($handler);
+ *
+ * @author Jelle Vink <jelle.vink@gmail.com>
+ */
+class ElasticSearchHandler extends AbstractProcessingHandler
+{
+    /**
+     * @var Client
+     */
+    protected $client;
+
+    /**
+     * @var array Handler config options
+     */
+    protected $options = array();
+
+    /**
+     * @param Client  $client   Elastica Client object
+     * @param array   $options  Handler configuration
+     * @param integer $level    The minimum logging level at which this handler will be triggered
+     * @param Boolean $bubble   Whether the messages that are handled can bubble up the stack or not
+     */
+    public function __construct(Client $client, array $options = array(), $level = Logger::DEBUG, $bubble = true)
+    {
+        parent::__construct($level, $bubble);
+        $this->client = $client;
+        $this->options = array_merge(
+            array(
+                'index'          => 'monolog',      // Elastic index name
+                'type'           => 'record',       // Elastic document type
+                'ignore_error'   => false,          // Suppress Elastica exceptions
+            ),
+            $options
+        );
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected function write(array $record)
+    {
+        $this->bulkSend(array($record['formatted']));
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function setFormatter(FormatterInterface $formatter)
+    {
+        if ($formatter instanceof ElasticaFormatter) {
+            return parent::setFormatter($formatter);
+        }
+        throw new \InvalidArgumentException('ElasticSearchHandler is only compatible with ElasticaFormatter');
+    }
+
+    /**
+     * Getter options
+     * @return array
+     */
+    public function getOptions()
+    {
+        return $this->options;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected function getDefaultFormatter()
+    {
+        return new ElasticaFormatter($this->options['index'], $this->options['type']);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function handleBatch(array $records)
+    {
+        $documents = $this->getFormatter()->formatBatch($records);
+        $this->bulkSend($documents);
+    }
+
+    /**
+     * Use Elasticsearch bulk API to send list of documents
+     * @param array $documents
+     * @throws \RuntimeException
+     */
+    protected function bulkSend(array $documents)
+    {
+        try {
+            $this->client->addDocuments($documents);
+        } catch (ExceptionInterface $e) {
+            if (!$this->options['ignore_error']) {
+                throw new \RuntimeException("Error sending messages to Elasticsearch", 0, $e);
+            }
+        }
+    }
+}

+ 80 - 0
tests/Monolog/Formatter/ElasticaFormatterTest.php

@@ -0,0 +1,80 @@
+<?php
+
+/*
+ * This file is part of the Monolog package.
+ *
+ * (c) Jordi Boggiano <j.boggiano@seld.be>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Monolog\Formatter;
+
+use Monolog\Logger;
+use Monolog\Formatter\ElasticaFormatter;
+
+class ElasticaFormatterTest extends \PHPUnit_Framework_TestCase
+{
+    public function setUp()
+    {
+        if (!class_exists("Elastica\Document")) {
+            $this->markTestSkipped("ruflin/elastica not installed");
+        }
+    }
+
+    /**
+     * @covers Monolog\Formatter\ElasticaFormatter::__construct
+     * @covers Monolog\Formatter\ElasticaFormatter::format
+     * @covers Monolog\Formatter\ElasticaFormatter::getDocument
+     */
+    public function testFormat()
+    {
+        // test log message
+        $msg = array(
+            'level' => Logger::ERROR,
+            'level_name' => 'ERROR',
+            'channel' => 'meh',
+            'context' => array('foo' => 7, 'bar', 'class' => new \stdClass),
+            'datetime' => new \DateTime("@0"),
+            'extra' => array(),
+            'message' => 'log',
+        );
+
+        // expected values
+        $expected = $msg;
+        $expected['datetime'] = '1970-01-01T00:00:00+0000';
+        $expected['context'] = array(
+            'class' => '[object] (stdClass: {})',
+            'foo' => 7,
+            0 => 'bar',
+        );
+
+        // format log message
+        $formatter = new ElasticaFormatter('my_index', 'doc_type');
+        $doc = $formatter->format($msg);
+        $this->assertInstanceOf('Elastica\Document', $doc);
+
+        // Document parameters
+        $params = $doc->getParams();
+        $this->assertEquals('my_index', $params['_index']);
+        $this->assertEquals('doc_type', $params['_type']);
+
+        // Document data values
+        $data = $doc->getData();
+        foreach (array_keys($expected) as $key) {
+            $this->assertEquals($expected[$key], $data[$key]);
+        }
+    }
+
+    /**
+     * @covers Monolog\Formatter\ElasticaFormatter::getIndex
+     * @covers Monolog\Formatter\ElasticaFormatter::getType
+     */
+    public function testGetters()
+    {
+        $formatter = new ElasticaFormatter('my_index', 'doc_type');
+        $this->assertEquals('my_index', $formatter->getIndex());
+        $this->assertEquals('doc_type', $formatter->getType());
+    }
+}

+ 239 - 0
tests/Monolog/Handler/ElasticSearchHandlerTest.php

@@ -0,0 +1,239 @@
+<?php
+
+/*
+ * This file is part of the Monolog package.
+ *
+ * (c) Jordi Boggiano <j.boggiano@seld.be>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Monolog\Handler;
+
+use Monolog\Handler\ElasticSearchHandler;
+use Monolog\Formatter\ElasticaFormatter;
+use Monolog\Formatter\NormalizerFormatter;
+use Monolog\TestCase;
+use Monolog\Logger;
+use Elastica\Client;
+use Elastica\Request;
+use Elastica\Response;
+
+class ElasticSearchHandlerTest extends TestCase
+{
+    /**
+     * @var Client mock
+     */
+    protected $client;
+
+    /**
+     * @var array Default handler options
+     */
+    protected $options = array(
+        'index' => 'my_index',
+        'type'  => 'doc_type',
+    );
+
+    public function setUp()
+    {
+        // Elastica lib required
+        if (!class_exists("Elastica\Client")) {
+            $this->markTestSkipped("ruflin/elastica not installed");
+        }
+
+        // base mock Elastica Client object
+        $this->client = $this->getMockBuilder('Elastica\Client')
+            ->setMethods(array('addDocuments'))
+            ->disableOriginalConstructor()
+            ->getMock();
+    }
+
+    /**
+     * @covers Monolog\Handler\ElasticSearchHandler::write
+     * @covers Monolog\Handler\ElasticSearchHandler::handleBatch
+     * @covers Monolog\Handler\ElasticSearchHandler::bulkSend
+     * @covers Monolog\Handler\ElasticSearchHandler::getDefaultFormatter
+     */
+    public function testHandle()
+    {
+        // log message
+        $msg = array(
+            'level' => Logger::ERROR,
+            'level_name' => 'ERROR',
+            'channel' => 'meh',
+            'context' => array('foo' => 7, 'bar', 'class' => new \stdClass),
+            'datetime' => new \DateTime("@0"),
+            'extra' => array(),
+            'message' => 'log',
+        );
+
+        // format expected result
+        $formatter = new ElasticaFormatter($this->options['index'], $this->options['type']);
+        $expected = array($formatter->format($msg));
+
+        // setup ES client mock
+        $this->client->expects($this->any())
+            ->method('addDocuments')
+            ->with($expected);
+
+        // perform tests
+        $handler = new ElasticSearchHandler($this->client, $this->options);
+        $handler->handle($msg);
+        $handler->handleBatch(array($msg));
+    }
+
+    /**
+     * @covers Monolog\Handler\ElasticSearchHandler::setFormatter
+     */
+    public function testSetFormatter()
+    {
+        $handler = new ElasticSearchHandler($this->client);
+        $formatter = new ElasticaFormatter('index_new', 'type_new');
+        $handler->setFormatter($formatter);
+        $this->assertInstanceOf('Monolog\Formatter\ElasticaFormatter', $handler->getFormatter());
+        $this->assertEquals('index_new', $handler->getFormatter()->getIndex());
+        $this->assertEquals('type_new', $handler->getFormatter()->getType());
+    }
+
+    /**
+     * @covers                   Monolog\Handler\ElasticSearchHandler::setFormatter
+     * @expectedException        InvalidArgumentException
+     * @expectedExceptionMessage ElasticSearchHandler is only compatible with ElasticaFormatter
+     */
+    public function testSetFormatterInvalid()
+    {
+        $handler = new ElasticSearchHandler($this->client);
+        $formatter = new NormalizerFormatter();
+        $handler->setFormatter($formatter);
+    }
+
+    /**
+     * @covers Monolog\Handler\ElasticSearchHandler::__construct
+     * @covers Monolog\Handler\ElasticSearchHandler::getOptions
+     */
+    public function testOptions()
+    {
+        $expected = array(
+            'index' => $this->options['index'],
+            'type' => $this->options['type'],
+            'ignore_error' => false,
+        );
+        $handler = new ElasticSearchHandler($this->client, $this->options);
+        $this->assertEquals($expected, $handler->getOptions());
+    }
+
+    /**
+     * @covers       Monolog\Handler\ElasticSearchHandler::bulkSend
+     * @dataProvider providerTestConnectionErrors
+     */
+    public function testConnectionErrors($ignore, $expectedError)
+    {
+        $clientOpts = array('host' => '127.0.0.1', 'port' => 1);
+        $client = new Client($clientOpts);
+        $handlerOpts = array('ignore_error' => $ignore);
+        $handler = new ElasticSearchHandler($client, $handlerOpts);
+
+        if ($expectedError) {
+            $this->setExpectedException($expectedError[0], $expectedError[1]);
+            $handler->handle($this->getRecord());
+        } else {
+            $this->assertFalse($handler->handle($this->getRecord()));
+        }
+    }
+
+    /**
+     * @return array
+     */
+    public function providerTestConnectionErrors()
+    {
+        return array(
+            array(false, array('RuntimeException', 'Error sending messages to Elasticsearch')),
+            array(true, false),
+        );
+    }
+
+    /**
+     * Integration test using localhost Elastic Search server
+     *
+     * @covers Monolog\Handler\ElasticSearchHandler::__construct
+     * @covers Monolog\Handler\ElasticSearchHandler::handleBatch
+     * @covers Monolog\Handler\ElasticSearchHandler::bulkSend
+     * @covers Monolog\Handler\ElasticSearchHandler::getDefaultFormatter
+     */
+    public function testHandleIntegration()
+    {
+        $msg = array(
+            'level' => Logger::ERROR,
+            'level_name' => 'ERROR',
+            'channel' => 'meh',
+            'context' => array('foo' => 7, 'bar', 'class' => new \stdClass),
+            'datetime' => new \DateTime("@0"),
+            'extra' => array(),
+            'message' => 'log',
+        );
+
+        $expected = $msg;
+        $expected['datetime'] = $msg['datetime']->format(\DateTime::ISO8601);
+        $expected['context'] = array(
+            'class' => '[object] (stdClass: {})',
+            'foo' => 7,
+            0 => 'bar',
+        );
+
+        $client = new Client();
+        $handler = new ElasticSearchHandler($client, $this->options);
+        try {
+            $handler->handleBatch(array($msg));
+        } catch(\RuntimeException $e) {
+            $this->markTestSkipped("Cannot connect to Elastic Search server on localhost");
+        }
+
+        // check document id from ES server response
+        $documentId = $this->getCreatedDocId($client->getLastResponse());
+        $this->assertNotEmpty($documentId, 'No elastic document id received');
+
+        // retrieve document source from ES and validate
+        $document = $this->getDocSourceFromElastic(
+            $client,
+            $this->options['index'],
+            $this->options['type'],
+            $documentId
+        );
+        $this->assertEquals($expected, $document);
+
+        // remove test index from ES
+        $client->request("/{$this->options['index']}", Request::DELETE);
+    }
+
+    /**
+     * Return last created document id from ES response
+     * @param Response $response Elastica Response object
+     * @return string|null
+     */
+    protected function getCreatedDocId(Response $response)
+    {
+        $data = $response->getData();
+        if (!empty($data['items'][0]['create']['_id'])) {
+            return $data['items'][0]['create']['_id'];
+        }
+    }
+
+    /**
+     * Retrieve document by id from Elasticsearch
+     * @param Client $client Elastica client
+     * @param string $index
+     * @param string $type
+     * @param string $documentId
+     * @return array
+     */
+    protected function getDocSourceFromElastic(Client $client, $index, $type, $documentId)
+    {
+        $resp = $client->request("/{$index}/{$type}/{$documentId}", Request::GET);
+        $data = $resp->getData();
+        if (!empty($data['_source'])) {
+            return $data['_source'];
+        }
+        return array();
+    }
+}