Source of file ClientEndpoint.php
Size: 5,226 Bytes - Last Modified: 2021-01-12T22:04:13+00:00
C:/Users/MAKS/Code/_PROJECTS/amqp-agent/src/RPC/ClientEndpoint.php
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 
                                Covered by 3 test(s):
                            4748 
 
                                Covered by 3 test(s):
                            49 
 
                                Covered by 3 test(s):
                            50 
 
                                Covered by 3 test(s):
                            51 
 
                                Covered by 3 test(s):
                            52 
 
                                Covered by 3 test(s):
                            53 
 
                                Covered by 3 test(s):
                            54 
 
                                Covered by 3 test(s):
                            555657 
 
                                Covered by 3 test(s):
                            58 
 
                                Covered by 3 test(s):
                            59 
 
                                Covered by 3 test(s):
                            60 
 
                                Covered by 3 test(s):
                            61 
 
                                Covered by 3 test(s):
                            62 
 
                                Covered by 3 test(s):
                            63 
 
                                Covered by 3 test(s):
                            6465 
 
                                Covered by 2 test(s):
                            66 
 
                                Covered by 3 test(s):
                            67686970 
 
                                Covered by 3 test(s):
                            717273747576777879808182 
 
                                Covered by 3 test(s):
                            83 
 
                                Covered by 1 test(s):
                            848586 
 
                                Covered by 2 test(s):
                            87 
 
                                Covered by 2 test(s):
                            88 
 
                                Covered by 2 test(s):
                            89 
 
                                Covered by 2 test(s):
                            90 
 
                                Covered by 2 test(s):
                            9192 
 
                                Covered by 2 test(s):
                            93 
 
                                Covered by 2 test(s):
                            94 
 
                                Covered by 2 test(s):
                            95 
 
                                Covered by 2 test(s):
                            9697 
 
                                Covered by 2 test(s):
                            98 
 
                                Covered by 2 test(s):
                            99 
 
                                Covered by 2 test(s):
                            100 
 
                                Covered by 2 test(s):
                            101 
 
                                Covered by 2 test(s):
                            102 
 
                                Covered by 2 test(s):
                            103104105 
 
                                Covered by 2 test(s):
                            106107 
 
                                Covered by 2 test(s):
                            108 
 
                                Covered by 2 test(s):
                            109 
 
                                Covered by 2 test(s):
                            110 
 
                                Covered by 2 test(s):
                            111112113 
 
                                Covered by 2 test(s):
                            114115 
 
                                Covered by 2 test(s):
                            116 
 
                                Covered by 2 test(s):
                            117118119 
 
                                Covered by 1 test(s):
                            120121122123124125126127128129130131132 
 
                                Covered by 1 test(s):
                            133134135136137138139140141142143 
 
                                Covered by 2 test(s):
                            144145 
 
                                Covered by 2 test(s):
                            146 
 
                                Covered by 1 test(s):
                            147 
 
                                Covered by 1 test(s):
                            148 
 
                                Covered by 1 test(s):
                            149150151 
 
                                Covered by 1 test(s):
                            152 
 
                                Covered by 1 test(s):
                            153 
 
                                Covered by 1 test(s):
                            154 
 
                                Covered by 1 test(s):
                            155 
 
                                Covered by 1 test(s):
                            156157158159160161162163164165166167 
 
                                Covered by 1 test(s):
                            168169170 
 | <?php/** * @author Marwan Al-Soltany <MarwanAlsoltany@gmail.com> * @copyright Marwan Al-Soltany 2020 * For the full copyright and license information, please view * the LICENSE file that was distributed with this source code. */declare(strict_types=1); namespace MAKS\AmqpAgent\RPC; use PhpAmqpLib\Message\AMQPMessage; use MAKS\AmqpAgent\Helper\ClassProxy; use MAKS\AmqpAgent\Helper\IDGenerator; use MAKS\AmqpAgent\RPC\AbstractEndpoint; use MAKS\AmqpAgent\RPC\ClientEndpointInterface; use MAKS\AmqpAgent\Exception\RPCEndpointException; /** * A class specialized in requesting. Implementing only the methods needed for a client. * * Example: * ``` * $clientEndpoint = new ClientEndpoint(); * $clientEndpoint->on('some.event', function () { ... }); * $clientEndpoint->connect(); * $clientEndpoint->request('Message Body', 'queue.name'); * $clientEndpoint->disconnect(); * ``` * * @since 2.0.0 * @api */class ClientEndpoint extends AbstractEndpoint implements ClientEndpointInterface {/**      * Opens a connection with RabbitMQ server.     * @param array|null $connectionOptions     * @return self     * @throws RPCEndpointException     */public function connect(?array $connectionOptions = []) { parent::connect($connectionOptions); if ($this->isConnected()) { list($this->responseQueue, , ) = $this->channel->queue_declare( null, false, false, true, false ); $this->channel->basic_consume( $this->responseQueue, null, false, false, false, false, function ($message) { ClassProxy::call($this, 'onResponse', $message); } ); } return $this; } /**      * Sends the passed request to the server using the passed queue.     * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance.     * @param string|null $queueName [optional] The name of queue to send through.     * @return string The response body.     * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response.     */public function request($request, ?string $queueName = null): string { if (!$this->isConnected()) { throw new RPCEndpointException('Client is not connected yet!'); } $this->queueName = $queueName ?? $this->queueName; $this->requestBody = $request instanceof AMQPMessage ? $request->body : (string)$request; $this->responseBody = null; $this->requestQueue = $this->queueName; $this->correlationId = IDGenerator::generateHash(); $message = $request instanceof AMQPMessage ? $request : new AMQPMessage((string)$request); $message->set('reply_to', $this->responseQueue); $message->set('correlation_id', $this->correlationId); $message->set('timestamp', time()); $this->channel->queue_declare( $this->requestQueue, false, false, false, false ); $this->trigger('request.before.send', [$message]); $this->channel->basic_publish( $message, null, $this->requestQueue ); $this->trigger('request.after.send', [$message]); while ($this->responseBody === null) { $this->channel->wait(); } return $this->responseBody; } /**      * Sends the passed request to the server using the passed queue.     * Alias for `self::request()`.     * @param string|AMQPMessage $request The request body or an `AMQPMessage` instance.     * @param string|null $queueName [optional] The name of queue to send through.     * @return string The response body.     * @throws RPCEndpointException If the client is not connected yet or if request Correlation ID does not match the one of the response.     */public function call($request, ?string $queueName = null): string { return $this->request($request, $queueName); } /**      * Validates the response.     * @param AMQPMessage $response     * @return void     * @throws RPCEndpointException     */protected function onResponse(AMQPMessage $response): void { $this->trigger('response.on.get', [$response]); if ($this->correlationId === $response->get('correlation_id')) { $this->responseBody = $this->callback($response); $response->ack(); return; } throw new RPCEndpointException( sprintf( 'Correlation ID of the response "%s" does not match the one of the request "%s"!', $this->correlationId, (string)$response->get('correlation_id') ) ); } /**      * Returns the final response body.     * @param AMQPMessage $message     * @return string     */protected function callback(AMQPMessage $message): string { return $message->body; } } |