瀏覽代碼

Add handleBatch implementation for PhpAmqpLib

Ronald Drenth 10 年之前
父節點
當前提交
0d2bef0579
共有 1 個文件被更改,包括 65 次插入16 次删除
  1. 65 16
      src/Monolog/Handler/AmqpHandler.php

+ 65 - 16
src/Monolog/Handler/AmqpHandler.php

@@ -55,18 +55,12 @@ class AmqpHandler extends AbstractProcessingHandler
     protected function write(array $record)
     protected function write(array $record)
     {
     {
         $data = $record["formatted"];
         $data = $record["formatted"];
-
-        $routingKey = sprintf(
-            '%s.%s',
-            // TODO 2.0 remove substr call
-            substr($record['level_name'], 0, 4),
-            $record['channel']
-        );
+        $routingKey = $this->getRoutingKey($record);
 
 
         if ($this->exchange instanceof AMQPExchange) {
         if ($this->exchange instanceof AMQPExchange) {
             $this->exchange->publish(
             $this->exchange->publish(
                 $data,
                 $data,
-                strtolower($routingKey),
+                $routingKey,
                 0,
                 0,
                 array(
                 array(
                     'delivery_mode' => 2,
                     'delivery_mode' => 2,
@@ -75,17 +69,72 @@ class AmqpHandler extends AbstractProcessingHandler
             );
             );
         } else {
         } else {
             $this->exchange->basic_publish(
             $this->exchange->basic_publish(
-                new AMQPMessage(
-                    (string) $data,
-                    array(
-                        'delivery_mode' => 2,
-                        'content_type' => 'application/json',
-                    )
-                ),
+                $this->createAmqpMessage($data),
+                $this->exchangeName,
+                $routingKey
+            );
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public function handleBatch(array $records)
+    {
+        if ($this->exchange instanceof AMQPExchange) {
+            parent::handleBatch($records);
+            return;
+        }
+
+        foreach ($records as $record) {
+            if (!$this->isHandling($record)) {
+                continue;
+            }
+
+            $record = $this->processRecord($record);
+            $data = $this->getFormatter()->format($record);
+
+            $this->exchange->batch_basic_publish(
+                $this->createAmqpMessage($data),
                 $this->exchangeName,
                 $this->exchangeName,
-                strtolower($routingKey)
+                $this->getRoutingKey($record)
             );
             );
         }
         }
+
+        $this->exchange->publish_batch();
+    }
+
+    /**
+     * Gets the routing key for the AMQP exchange
+     *
+     * @param array $record
+     * @return string
+     */
+    private function getRoutingKey(array $record)
+    {
+        $routingKey = sprintf(
+            '%s.%s',
+            // TODO 2.0 remove substr call
+            substr($record['level_name'], 0, 4),
+            $record['channel']
+        );
+
+        return strtolower($routingKey);
+    }
+
+    /**
+     * @param string $data
+     * @return AMQPMessage
+     */
+    private function createAmqpMessage($data)
+    {
+        return new AMQPMessage(
+            (string) $data,
+            array(
+                'delivery_mode' => 2,
+                'content_type' => 'application/json',
+            )
+        );
     }
     }
 
 
     /**
     /**