Source of file AbstractEndpoint.php
Size: 8,353 Bytes - Last Modified: 2021-01-12T22:04:13+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/AbstractEndpoint.php
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 
                                Covered by 16 test(s):
                            103 
 
                                Covered by 16 test(s):
                            104 
 
                                Covered by 16 test(s):
                            105106107108109110111 
 
                                Covered by 10 test(s):
                            112 
 
                                Covered by 10 test(s):
                            113114115116117118119120121122123 
 
                                Covered by 11 test(s):
                            124 
 
                                Covered by 11 test(s):
                            125 
 
                                Covered by 11 test(s):
                            126127128 
 
                                Covered by 11 test(s):
                            129 
 
                                Covered by 1 test(s):
                            130131132 
 
                                Covered by 11 test(s):
                            133134 
 
                                Covered by 11 test(s):
                            135 
 
                                Covered by 11 test(s):
                            136137 
 
                                Covered by 11 test(s):
                            138 
 
                                Covered by 11 test(s):
                            139140 
 
                                Covered by 11 test(s):
                            141142143144145146147148149 
 
                                Covered by 12 test(s):
                            150 
 
                                Covered by 7 test(s):
                            151152 
 
                                Covered by 7 test(s):
                            153 
 
                                Covered by 7 test(s):
                            154155 
 
                                Covered by 7 test(s):
                            156 
 
                                Covered by 7 test(s):
                            157158 
 
                                Covered by 12 test(s):
                            159160161162163164165166 
 
                                Covered by 16 test(s):
                            167 
 
                                Covered by 16 test(s):
                            168 
 
                                Covered by 16 test(s):
                            169 
 
                                Covered by 16 test(s):
                            170 
 
                                Covered by 10 test(s):
                            171172173 
 
                                Covered by 16 test(s):
                            174175176177178179180181182 
 
                                Covered by 1 test(s):
                            183184185186187188189190191192193 
 
                                Covered by 3 test(s):
                            194 
 
                                Covered by 3 test(s):
                            195 
 
                                Covered by 1 test(s):
                            196 
 
                                Covered by 1 test(s):
                            197198 
 
                                Covered by 3 test(s):
                            199200 
 
                                Covered by 2 test(s):
                            201 
 
                                Covered by 2 test(s):
                            202 
 
                                Covered by 2 test(s):
                            203 
 
                                Covered by 2 test(s):
                            204 
 
                                Covered by 2 test(s):
                            205 
 
                                Covered by 2 test(s):
                            206207208 
 
                                Covered by 2 test(s):
                            209 
 
                                Covered by 2 test(s):
                            210 
 
                                Covered by 2 test(s):
                            211 
 
                                Covered by 2 test(s):
                            212213214 
 
                                Covered by 2 test(s):
                            215216 
 
                                Covered by 2 test(s):
                            217 
 
                                Covered by 2 test(s):
                            218 
 
                                Covered by 2 test(s):
                            219 
 
                                Covered by 2 test(s):
                            220 
 
                                Covered by 2 test(s):
                            221 
 
                                Covered by 2 test(s):
                            222 
 
                                Covered by 2 test(s):
                            223224 
 
                                Covered by 2 test(s):
                            225 
 
                                Covered by 2 test(s):
                            226 
 
                                Covered by 2 test(s):
                            227228229 
 
                                Covered by 2 test(s):
                            230231 
 
                                Covered by 2 test(s):
                            232 
 
                                Covered by 2 test(s):
                            233 
 
                                Covered by 2 test(s):
                            234235236237 
 
                                Covered by 2 test(s):
                            238 
 
                                Covered by 2 test(s):
                            239240241 
 
                                Covered by 2 test(s):
                            242243 
 
                                Covered by 2 test(s):
                            244245 
 
                                Covered by 2 test(s):
                            246 
 
                                Covered by 1 test(s):
                            247248 
 
                                Covered by 1 test(s):
                            249 
 
                                Covered by 1 test(s):
                            250251252 
 
                                Covered by 2 test(s):
                            253 
 
                                Covered by 1 test(s):
                            254 
 
                                Covered by 1 test(s):
                            255256257258259260261262263264265266267268269270271272273274275276277278 
 
                                Covered by 6 test(s):
                            279 
 
                                Covered by 6 test(s):
                            280 
 
                                Covered by 6 test(s):
                            281 
 
                                Covered by 6 test(s):
                            282 
 
                                Covered by 6 test(s):
                            283284285 
 
                                Covered by 6 test(s):
                            286287 
 
                                Covered by 3 test(s):
                            288289290291292293294295296297 
 | <?php/** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */declare(strict_types=1); namespace MAKS\AmqpAgent\RPC; use Exception; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Message\AMQPMessage; use MAKS\AmqpAgent\RPC\AbstractEndpointInterface; use MAKS\AmqpAgent\Helper\EventTrait; use MAKS\AmqpAgent\Exception\MagicMethodsExceptionsTrait; use MAKS\AmqpAgent\Exception\RPCEndpointException; use MAKS\AmqpAgent\Config\RPCEndpointParameters as Parameters; /** * An abstract class implementing the basic functionality of an endpoint. * @since 2.0.0 * @api */abstract class AbstractEndpoint implements AbstractEndpointInterface {use MagicMethodsExceptionsTrait; use EventTrait; /**      * The connection options of the RPC endpoint.     * @var array     */protected $connectionOptions; /**      * The queue name of the RPC endpoint.     * @var string     */protected $queueName; /**      * Whether the endpoint is connected to RabbitMQ server or not.     * @var bool     */protected $connected; /**      * The endpoint connection.     * @var AMQPStreamConnection     */protected $connection; /**      * The endpoint channel.     * @var AMQPChannel     */protected $channel; /**      * The request body.     * @var string     */protected $requestBody; /**      * Requests conveyor.     * @var string     */protected $requestQueue; /**      * The response body.     * @var string     */protected $responseBody; /**      * Responses conveyor.     * @var string     */protected $responseQueue; /**      * Correlation ID of the last request/response.     * @var string     */protected $correlationId; /**      * Class constructor.     * @param array $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.     * @param string $queueName [optional] The override for the default queue name of the RPC endpoint.     */public function __construct(?array $connectionOptions = [], ?string $queueName = null) { $this->connectionOptions = Parameters::patch($connectionOptions, 'RPC_CONNECTION_OPTIONS'); $this->queueName = empty($queueName) ? Parameters::RPC_QUEUE_NAME : $queueName; } /**      * Closes the connection with RabbitMQ server before destroying the object.     */public function __destruct() { $this->disconnect(); } /**      * Opens a connection with RabbitMQ server.     * @param array|null $connectionOptions [optional] The overrides for the default connection options of the RPC endpoint.     * @return self     * @throws RPCEndpointException If the endpoint is already connected.     */public function connect(?array $connectionOptions = []) { $this->connectionOptions = Parameters::patchWith( $connectionOptions ?? [], $this->connectionOptions ); if ($this->isConnected()) { throw new RPCEndpointException('Endpoint is already connected!'); } $parameters = array_values($this->connectionOptions); $this->connection = new AMQPStreamConnection(...$parameters); $this->trigger('connection.after.open', [$this->connection]); $this->channel = $this->connection->channel(); $this->trigger('channel.after.open', [$this->channel]); return $this; } /**      * Closes the connection with RabbitMQ server.     * @return void     */public function disconnect(): void { if ($this->isConnected()) { $this->connected = null; $this->trigger('channel.before.close', [$this->channel]); $this->channel->close(); $this->trigger('connection.before.close', [$this->connection]); $this->connection->close(); } } /**      * Returns whether the endpoint is connected or not.     * @return bool     */public function isConnected(): bool { $this->connected = ( isset($this->connection) && isset($this->channel) && $this->connection->isConnected() && $this->channel->is_open() ); return $this->connected; } /**      * Returns the connection used by the endpoint.     * @return AMQPStreamConnection     */public function getConnection(): AMQPStreamConnection { return $this->connection; } /**      * The time needed for the round-trip to RabbitMQ server in milliseconds.     * Note that if the endpoint is not connected yet, this method will establish a new connection only for checking.     * @return float A two decimal points rounded float.     */final public function ping(): float { try { $pingConnection = $this->connection; if (!isset($pingConnection) || !$pingConnection->isConnected()) { $parameters = array_values($this->connectionOptions); $pingConnection = new AMQPStreamConnection(...$parameters); } $pingChannel = $pingConnection->channel(); [$pingQueue] = $pingChannel->queue_declare( null, false, false, true, true ); $pingChannel->basic_qos( null, 1, null ); $pingEcho = null; $pingChannel->basic_consume( $pingQueue, null, false, false, false, false, function ($message) use (&$pingEcho) { $message->ack(); $pingEcho = $message->body; } ); $pingStartTime = microtime(true); $pingChannel->basic_publish( new AMQPMessage(__FUNCTION__), null, $pingQueue ); while (!$pingEcho) { $pingChannel->wait(); } $pingEndTime = microtime(true); $pingChannel->queue_delete($pingQueue); if ($pingConnection === $this->connection) { $pingChannel->close(); } else { $pingChannel->close(); $pingConnection->close(); } return round(($pingEndTime - $pingStartTime) * 1000, 2); } catch (Exception $error) { RPCEndpointException::rethrow($error); } } /**      * Hooking method based on events to manipulate the request/response during the endpoint/message life cycle.     * Check out `self::$events` via `self::getEvents()` after processing at least one request/response to see all available events.     *     * The parameters will be passed to the callback as follows:     *      1. `$listenedOnObject` (first segment of event name e.g. `'connection.after.open'` will be `$connection`),     *      2. `$calledOnObject` (the object this method was called on e.g. `$endpoint`),     *      3. `$eventName` (the event was listened on e.g. `'connection.after.open'`).     * ```     * $endpoint->on('connection.after.open', function ($connection, $endpoint, $event) {     *      ...     * });     * ```     * @param string $event The event to listen on.     * @param callable $callback The callback to execute.     * @return self     */final public function on(string $event, callable $callback) { $this->bind($event, function (...$arguments) use ($event, $callback) { call_user_func_array( $callback, array_merge( $arguments, [$this, $event] ) ); }); return $this; } /**      * Hook method to manipulate the message (request/response) when extending the class.     * @param AMQPMessage $message     * @return string     */abstract protected function callback(AMQPMessage $message): string; } |