Source of file Publisher.php
Size: 15,026 Bytes - Last Modified: 2021-03-15T15:06:16+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/Worker/Publisher.php
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 
                                Covered by 13 test(s):
                            91 
 
                                Covered by 13 test(s):
                            92 
 
                                Covered by 13 test(s):
                            93 
 
                                Covered by 13 test(s):
                            9495 
 
                                Covered by 13 test(s):
                            96 
 
                                Covered by 13 test(s):
                            979899100101102103104105106107108 
 
                                Covered by 8 test(s):
                            109 
 
                                Covered by 8 test(s):
                            110 
 
                                Covered by 4 test(s):
                            111112113 
 
                                Covered by 8 test(s):
                            114115116 
 
                                Covered by 8 test(s):
                            117 
 
                                Covered by 8 test(s):
                            118 
 
                                Covered by 8 test(s):
                            119 
 
                                Covered by 8 test(s):
                            120 
 
                                Covered by 8 test(s):
                            121 
 
                                Covered by 8 test(s):
                            122 
 
                                Covered by 8 test(s):
                            123 
 
                                Covered by 8 test(s):
                            124 
 
                                Covered by 8 test(s):
                            125 
 
                                Covered by 8 test(s):
                            126127128129130131 
 
                                Covered by 8 test(s):
                            132 
 
                                Covered by 4 test(s):
                            133134135 
 
                                Covered by 8 test(s):
                            136137138139140141142143144145146147 
 
                                Covered by 7 test(s):
                            148 
 
                                Covered by 7 test(s):
                            149 
 
                                Covered by 4 test(s):
                            150151152 
 
                                Covered by 7 test(s):
                            153154155 
 
                                Covered by 7 test(s):
                            156 
 
                                Covered by 7 test(s):
                            157 
 
                                Covered by 7 test(s):
                            158 
 
                                Covered by 7 test(s):
                            159 
 
                                Covered by 7 test(s):
                            160 
 
                                Covered by 7 test(s):
                            161 
 
                                Covered by 7 test(s):
                            162163164165166167 
 
                                Covered by 7 test(s):
                            168 
 
                                Covered by 4 test(s):
                            169170171 
 
                                Covered by 7 test(s):
                            172173174175176177178179180181182 
 
                                Covered by 5 test(s):
                            183 
 
                                Covered by 5 test(s):
                            184 
 
                                Covered by 4 test(s):
                            185186187 
 
                                Covered by 5 test(s):
                            188 
 
                                Covered by 5 test(s):
                            189190191 
 
                                Covered by 5 test(s):
                            192 
 
                                Covered by 5 test(s):
                            193 
 
                                Covered by 5 test(s):
                            194195196 
 
                                Covered by 5 test(s):
                            197 
 
                                Covered by 4 test(s):
                            198199200 
 
                                Covered by 5 test(s):
                            201202203204205206207208209210211212213 
 
                                Covered by 5 test(s):
                            214 
 
                                Covered by 5 test(s):
                            215 
 
                                Covered by 3 test(s):
                            216217218 
 
                                Covered by 5 test(s):
                            219220 
 
                                Covered by 5 test(s):
                            221222 
 
                                Covered by 5 test(s):
                            223224 
 
                                Covered by 5 test(s):
                            225 
 
                                Covered by 1 test(s):
                            226 
 
                                Covered by 5 test(s):
                            227 
 
                                Covered by 3 test(s):
                            228 
 
                                Covered by 4 test(s):
                            229 
 
                                Covered by 2 test(s):
                            230231 
 
                                Covered by 2 test(s):
                            232 
 
                                Covered by 2 test(s):
                            233 
 
                                Covered by 2 test(s):
                            234 
 
                                Covered by 2 test(s):
                            235 
 
                                Covered by 2 test(s):
                            236 
 
                                Covered by 2 test(s):
                            237238239240241242 
 
                                Covered by 3 test(s):
                            243 
 
                                Covered by 3 test(s):
                            244 
 
                                Covered by 3 test(s):
                            245 
 
                                Covered by 3 test(s):
                            246 
 
                                Covered by 3 test(s):
                            247 
 
                                Covered by 3 test(s):
                            248 
 
                                Covered by 3 test(s):
                            249250251252 
 
                                Covered by 3 test(s):
                            253254 
 
                                Covered by 3 test(s):
                            255256257 
 
                                Covered by 3 test(s):
                            258 
 
                                Covered by 2 test(s):
                            259260261 
 
                                Covered by 3 test(s):
                            262263264265266267268269270271272273274275 
 
                                Covered by 2 test(s):
                            276 
 
                                Covered by 2 test(s):
                            277 
 
                                Covered by 1 test(s):
                            278279280 
 
                                Covered by 2 test(s):
                            281282 
 
                                Covered by 2 test(s):
                            283284 
 
                                Covered by 2 test(s):
                            285 
 
                                Covered by 2 test(s):
                            286 
 
                                Covered by 2 test(s):
                            287288 
 
                                Covered by 2 test(s):
                            289290 
 
                                Covered by 2 test(s):
                            291 
 
                                Covered by 2 test(s):
                            292 
 
                                Covered by 1 test(s):
                            293294 
 
                                Covered by 1 test(s):
                            295296297 
 
                                Covered by 1 test(s):
                            298 
 
                                Covered by 1 test(s):
                            299 
 
                                Covered by 1 test(s):
                            300 
 
                                Covered by 1 test(s):
                            301 
 
                                Covered by 1 test(s):
                            302303 
 
                                Covered by 1 test(s):
                            304305306307308 
 
                                Covered by 2 test(s):
                            309 
 
                                Covered by 2 test(s):
                            310 
 
                                Covered by 2 test(s):
                            311 
 
                                Covered by 2 test(s):
                            312 
 
                                Covered by 2 test(s):
                            313 
 
                                Covered by 2 test(s):
                            314 
 
                                Covered by 2 test(s):
                            315316317 
 
                                Covered by 2 test(s):
                            318319 
 
                                Covered by 2 test(s):
                            320321322323324325326327328329330331332333334335336337 
 
                                Covered by 1 test(s):
                            338339340 
 
                                Covered by 1 test(s):
                            341342 
 
                                Covered by 1 test(s):
                            343344345 
 
                                Covered by 1 test(s):
                            346 
 
                                Covered by 1 test(s):
                            347348349 
 
                                Covered by 1 test(s):
                            350351352353354355356357358 
 
                                Covered by 3 test(s):
                            359 
 
                                Covered by 3 test(s):
                            360 
 
                                Covered by 3 test(s):
                            361 
 
                                Covered by 3 test(s):
                            362363 
 
                                Covered by 3 test(s):
                            364365366367368369370371372373374375 
 
                                Covered by 2 test(s):
                            376 
 
                                Covered by 2 test(s):
                            377 
 
                                Covered by 2 test(s):
                            378379 
 
                                Covered by 1 test(s):
                            380 
 
                                Covered by 1 test(s):
                            381 
 
                                Covered by 1 test(s):
                            382383 
 
                                Covered by 1 test(s):
                            384385 
 | <?php/** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */declare(strict_types=1); namespace MAKS\AmqpAgent\Worker; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exception\AMQPInvalidArgumentException; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Exception\AMQPConnectionBlockedException; use PhpAmqpLib\Exception\AMQPConnectionClosedException; use PhpAmqpLib\Exception\AMQPChannelClosedException; use MAKS\AmqpAgent\Worker\AbstractWorker; use MAKS\AmqpAgent\Worker\PublisherInterface; use MAKS\AmqpAgent\Worker\WorkerFacilitationInterface; use MAKS\AmqpAgent\Exception\AmqpAgentException as Exception; use MAKS\AmqpAgent\Config\PublisherParameters as Parameters; /** * A class specialized in publishing. Implementing only the methods needed for a publisher. * * Example: * ``` * $publisher = new Publisher(); * $publisher->connect(); * $publisher->queue(); * $publisher->exchange(); * $publisher->bind(); * $publisher->publish('Some message!'); * $publisher->disconnect(); * ``` * * @since 1.0.0 * @api */class Publisher extends AbstractWorker implements PublisherInterface, WorkerFacilitationInterface {/**      * The default exchange options that the worker should use when no overrides are provided.     * @var array     */protected $exchangeOptions; /**      * The default bind options that the worker should use when no overrides are provided.     * @var array     */protected $bindOptions; /**      * The default message options that the worker should use when no overrides are provided.     * @var array     */protected $messageOptions; /**      * The default publish options that the worker should use when no overrides are provided.     * @var array     */protected $publishOptions; /**      * Publisher object constructor.     * @param array $connectionOptions [optional] The overrides for the default connection options of the worker.     * @param array $channelOptions [optional] The overrides for the default channel options of the worker.     * @param array $queueOptions [optional] The overrides for the default queue options of the worker.     * @param array $exchangeOptions [optional] The overrides for the default exchange options of the worker.     * @param array $bindOptions [optional] The overrides for the default bind options of the worker.     * @param array $messageOptions [optional] The overrides for the default message options of the worker.     * @param array $publishOptions [optional] The overrides for the default publish options of the worker.     */public function __construct( array $connectionOptions = [], array $channelOptions = [], array $queueOptions = [], array $exchangeOptions = [], array $bindOptions = [], array $messageOptions = [], array $publishOptions = [] ) { $this->exchangeOptions = Parameters::patch($exchangeOptions, 'EXCHANGE_OPTIONS'); $this->bindOptions = Parameters::patch($bindOptions, 'BIND_OPTIONS'); $this->messageOptions = Parameters::patch($messageOptions, 'MESSAGE_OPTIONS'); $this->publishOptions = Parameters::patch($publishOptions, 'PUBLISH_OPTIONS'); parent::__construct($connectionOptions, $channelOptions, $queueOptions); } /**      * Declares an exchange on the default channel of the worker's connection to RabbitMQ server.     * @param array|null $parameters [optional] The overrides for the default exchange options of the worker.     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.     * @return self     * @throws AMQPTimeoutException     */public function exchange(?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('exchangeOptions', $parameters); } $channel = $_channel ?: $this->channel; try { $channel->exchange_declare( $this->exchangeOptions['exchange'], $this->exchangeOptions['type'], $this->exchangeOptions['passive'], $this->exchangeOptions['durable'], $this->exchangeOptions['auto_delete'], $this->exchangeOptions['internal'], $this->exchangeOptions['nowait'], $this->exchangeOptions['arguments'], $this->exchangeOptions['ticket'] ); } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } if ($changes) { $this->mutateClassMember('exchangeOptions', $changes); } return $this; } /**      * Binds the default queue to the default exchange on the default channel of the worker's connection to RabbitMQ server.     * @param array|null $parameters [optional] The overrides for the default bind options of the worker.     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.     * @return self     * @throws AMQPTimeoutException     */public function bind(?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('bindOptions', $parameters); } $channel = $_channel ?: $this->channel; try { $channel->queue_bind( $this->bindOptions['queue'], $this->bindOptions['exchange'], $this->bindOptions['routing_key'], $this->bindOptions['nowait'], $this->bindOptions['arguments'], $this->bindOptions['ticket'] ); } catch (AMQPTimeoutException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } if ($changes) { $this->mutateClassMember('bindOptions', $changes); } return $this; } /**      * Returns an AMQPMessage object.     * @param string $body The body of the message.     * @param array|null $properties [optional] The overrides for the default properties of the default message options of the worker.     * @return AMQPMessage     */public function message(string $body, ?array $properties = null): AMQPMessage { $changes = null; if ($properties) { $changes = $this->mutateClassSubMember('messageOptions', 'properties', $properties); } if ($body) { $this->messageOptions['body'] = $body; } $message = new AMQPMessage( $this->messageOptions['body'], $this->messageOptions['properties'] ); if ($changes) { $this->mutateClassSubMember('messageOptions', 'properties', $changes); } return $message; } /**      * Publishes a message to the default exchange on the default channel of the worker's connection to RabbitMQ server.     * @param string|array|AMQPMessage $payload A string of the body of the message or an array of body and properties for the message or a AMQPMessage object.     * @param array|null $parameters [optional] The overrides for the default publish options of the worker.     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.     * @return self     * @throws Exception|AMQPChannelClosedException|AMQPConnectionClosedException|AMQPConnectionBlockedException     */public function publish($payload, ?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('publishOptions', $parameters); } $channel = $_channel ?: $this->channel; $originalMessage = $this->publishOptions['msg']; $message = $payload ?: $originalMessage; if ($message instanceof AMQPMessage) { $this->publishOptions['msg'] = $message; } elseif (is_array($message) && isset($message['body']) && isset($message['properties'])) { $this->publishOptions['msg'] = $this->message($message['body'], $message['properties']); } elseif (is_string($message)) { $this->publishOptions['msg'] = $this->message($message); } else { throw new Exception( sprintf( 'Payload must be a string, an array like %s, or an instance of "%s". The given parameter (data-type: %s) was none of them.', '["body" => "Message body!", "properties" ["key" => "value"]]', AMQPMessage::class, is_object($payload) ? get_class($payload) : gettype($payload) ) ); } try { $channel->basic_publish( $this->publishOptions['msg'], $this->publishOptions['exchange'], $this->publishOptions['routing_key'], $this->publishOptions['mandatory'], $this->publishOptions['immediate'], $this->publishOptions['ticket'] ); } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } finally { // reverting messageOptions back to its state. $this->publishOptions['msg'] = $originalMessage; } if ($changes) { $this->mutateClassMember('publishOptions', $changes); } return $this; } /**      * Publishes a batch of messages to the default exchange on the default channel of the worker's connection to RabbitMQ server.     * @param string[]|array[]|AMQPMessage[] $messages An array of bodies of the messages or an array of arrays of body and properties for the messages or an array of AMQPMessage objects.     * @param int $batchSize [optional] The number of messages that should be published per batch.     * @param array|null $parameters [optional] The overrides for the default publish options of the worker.     * @param AMQPChannel|null $_channel [optional] The channel that should be used instead of the default worker's channel.     * @return self     * @throws Exception|AMQPChannelClosedException|AMQPConnectionClosedException|AMQPConnectionBlockedException     */public function publishBatch(array $messages, int $batchSize = 2500, ?array $parameters = null, ?AMQPChannel $_channel = null) { $changes = null; if ($parameters) { $changes = $this->mutateClassMember('publishOptions', $parameters); } $channel = $_channel ?: $this->channel; $originalMessage = $this->publishOptions['msg']; $count = count($messages); for ($i = 0; $i < $count; $i++) { $payload = $messages[$i]; $message = $payload ?: $originalMessage; if ($message instanceof AMQPMessage) { $this->publishOptions['msg'] = $message; } elseif (is_array($message) && isset($message['body']) && isset($message['properties'])) { $this->publishOptions['msg'] = $this->message($message['body'], $message['properties']); } elseif (is_string($message)) { $this->publishOptions['msg'] = $this->message($message); } else { throw new Exception( sprintf( 'Messages array elements must be either a string, an array like %s, or an instance of "%s". Element in index "%d" (data-type: %s) was none of them.', '["body" => "Message body!", "properties" ["key" => "value"]]', AMQPMessage::class, $i, is_object($payload) ? get_class($payload) : gettype($payload) ) ); } $channel->batch_basic_publish( $this->publishOptions['msg'], $this->publishOptions['exchange'], $this->publishOptions['routing_key'], $this->publishOptions['mandatory'], $this->publishOptions['immediate'], $this->publishOptions['ticket'] ); if ($i % $batchSize == 0) { try { $channel->publish_batch(); // @codeCoverageIgnoreStart } catch (AMQPConnectionBlockedException $e) { $tries = -1; do { sleep(1); $tries++; } while ($this->connection->isBlocked() && $tries >= 60); $channel->publish_batch(); } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { Exception::rethrow($error); // @codeCoverageIgnoreEnd } } } try { $channel->publish_batch(); } catch (AMQPChannelClosedException | AMQPConnectionClosedException | AMQPConnectionBlockedException $error) { // @codeCoverageIgnore Exception::rethrow($error); // @codeCoverageIgnore } finally { // reverting messageOptions back to its state. $this->publishOptions['msg'] = $originalMessage; } if ($changes) { $this->mutateClassMember('publishOptions', $changes); } return $this; } /**      * Executes `self::connect()`, `self::queue()`, `self::exchange`, and `self::bind()` respectively.     * @return self     */public function prepare() { $this->connect(); $this->queue(); $this->exchange(); $this->bind(); return $this; } /**      * Executes `self::connect()`, `self::queue()`, `self::exchange`, `self::bind()`, `self::publish()`, and `self::disconnect()` respectively.     * @param string[]|array[]|AMQPMessage[] $messages An array of strings, arrays, or AMQPMessage objects (same as `self::publishBatch()`).     * @return void     * @throws Exception     */public function work($messages): void { try { $this->prepare(); foreach ($messages as $message) { $this->publish($message); } $this->disconnect(); } catch (Exception $error) { Exception::rethrow($error, null, false); } } } |