|
|
@@ -27,6 +27,29 @@ class AmqpHandler extends AbstractProcessingHandler
|
|
|
* @var AMQPExchange|AMQPChannel $exchange
|
|
|
*/
|
|
|
protected $exchange;
|
|
|
+ /** @var array */
|
|
|
+ private $extraAttributes = [];
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ public function getExtraAttributes(): array
|
|
|
+ {
|
|
|
+ return $this->extraAttributes;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param array $extraAttributes One of content_type, content_encoding,
|
|
|
+ * message_id, user_id, app_id, delivery_mode,
|
|
|
+ * priority, timestamp, expiration, type
|
|
|
+ * or reply_to, headers.
|
|
|
+ * @return AmqpHandler
|
|
|
+ */
|
|
|
+ public function setExtraAttributes(array $extraAttributes): self
|
|
|
+ {
|
|
|
+ $this->extraAttributes = $extraAttributes;
|
|
|
+ return $this;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* @var string
|
|
|
@@ -60,14 +83,18 @@ class AmqpHandler extends AbstractProcessingHandler
|
|
|
$routingKey = $this->getRoutingKey($record);
|
|
|
|
|
|
if ($this->exchange instanceof AMQPExchange) {
|
|
|
+ $attributes = [
|
|
|
+ 'delivery_mode' => 2,
|
|
|
+ 'content_type' => 'application/json',
|
|
|
+ ];
|
|
|
+ if ($this->extraAttributes) {
|
|
|
+ $attributes = array_merge($attributes, $this->extraAttributes);
|
|
|
+ }
|
|
|
$this->exchange->publish(
|
|
|
$data,
|
|
|
$routingKey,
|
|
|
0,
|
|
|
- [
|
|
|
- 'delivery_mode' => 2,
|
|
|
- 'content_type' => 'application/json',
|
|
|
- ]
|
|
|
+ $attributes
|
|
|
);
|
|
|
} else {
|
|
|
$this->exchange->basic_publish(
|